[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-683602732 @junrao the result of ```benchmark_test.py``` is attached (see description) The main regression ({"records_per_sec": 3653635.3672, "mb_per_sec": 348.4378} -> {"records_per_sec": 2992220.2274, "mb_per_sec": 285.3604}) happens in case ```test_id: kafkatest.benchmarks.core.benchmark_test.Benchmark.test_consumer_throughput.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.security_protocol=SSL.compression_type=snappy```. I re-run the case 5 times and it seems the throughput of that case is not stable. **BEFORE** 1. {"records_per_sec": 3653635.3672, "mb_per_sec": 348.4378} 1. {"records_per_sec": 3812428.517, "mb_per_sec": 363.5815} 1. {"records_per_sec": 3012048.1928, "mb_per_sec": 287.2513} 1. {"records_per_sec": 3182686.1871, "mb_per_sec": 303.5246} 1. {"records_per_sec": 2997601.9185, "mb_per_sec": 285.8736} **AFTER** 1. {"records_per_sec": 2992220.2274, "mb_per_sec": 285.3604} 1. {"records_per_sec": 3698224.8521, "mb_per_sec": 352.6902} 1. {"records_per_sec": 2977076.5109, "mb_per_sec": 283.9161} 1. {"records_per_sec": 3676470.5882, "mb_per_sec": 350.6156} 1. {"records_per_sec": 3681885.1252, "mb_per_sec": 351.1319} This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9177: KAFKA-9924: Add RocksDB metric num-entries-active-mem-table
cadonna commented on pull request #9177: URL: https://github.com/apache/kafka/pull/9177#issuecomment-683631452 @vvcephei Thank you very much for taking care of the conflicts and merging the PR! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9191: KAFKA-10355: Throw error when source topic was deleted
cadonna commented on pull request #9191: URL: https://github.com/apache/kafka/pull/9191#issuecomment-683673488 This is the implementation for KIP-662. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9231: KAFKA-10447: Migrate tools module to JUnit 5 and mockito
ijuma commented on a change in pull request #9231: URL: https://github.com/apache/kafka/pull/9231#discussion_r480023644 ## File path: tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java ## @@ -62,17 +62,18 @@ import java.util.List; import java.util.Optional; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +@Tag("integration") Review comment: This test is much slower than the rest in `tools`, so I think it makes sense to mark it as `integration`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607
cadonna opened a new pull request #9232: URL: https://github.com/apache/kafka/pull/9232 This commit adds the remaining property-based RocksDB metrics as described in KIP-607, except for num-entries-active-mem-table, which was added in PR #9177. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607
cadonna commented on pull request #9232: URL: https://github.com/apache/kafka/pull/9232#issuecomment-683729711 Call for review: @guozhangwang @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #9233: Testing
rondagostino opened a new pull request #9233: URL: https://github.com/apache/kafka/pull/9233 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino closed pull request #9233: Testing
rondagostino closed pull request #9233: URL: https://github.com/apache/kafka/pull/9233 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size
[ https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187747#comment-17187747 ] Sören Henning commented on KAFKA-9649: -- Hi, sorry for the late response, vacation came up... :) In our case, we observed this issue when grouping a windowed KTable by a new key for subsequent aggregation: {noformat} KTable, V> myTable = //... KGroupedTable, V> = myTable .groupBy( (k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(), k.window()), v), Grouped.with( new WindowedSerdes.TimeWindowedSerde<>( myTableXYAttributeSerde, myTableWindowSize), myTableValueSerde)); {noformat} Here, we have a windowed KTable with keys of type {{K}} and want to group it by a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}. When not passing {{myTableWindowSize}} to the {{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which actually are of type {{Windowed}}, are not assigned the correct end timestamps. This issue does not immediately becomes apparent, only a log message is produced: {noformat} WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window end time was truncated to Long.MAX {noformat} > Remove/Warn on use of TimeWindowedSerde with no specified window size > - > > Key: KAFKA-9649 > URL: https://issues.apache.org/jira/browse/KAFKA-9649 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sören Henning >Priority: Major > > The API of the > [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java] > promotes its construction without specifying a window size: > {noformat} > public TimeWindowedSerde(final Serde inner) > {noformat} > While code using this constructor looks absolutely clean, it leads to fatal > errors at runtime, which turned out to be very hard to discover. > The reason for these error can be found in the construction of the > [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java], > which is created via: > {noformat} > // TODO: fix this part as last bits of KAFKA-4468 > public TimeWindowedDeserializer(final Deserializer inner) { > this(inner, Long.MAX_VALUE); > } > {noformat} > The TODO comment suggests that this issue is (or at least was) already known. > We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} > constructor or at least to warn when using it (if required for backwards > compatiblity). The ideal solution of course would be to get the window size > from some externally provided context. However, I expect this to be difficult > to realize. Same applies also the {{TimeWindowedDeserializer(final > Deserializer inner)}} constructor. > A further minor suggestion in this context: As now most Kafka Streams time > declarations use {{Duration}} s instead of long-encoded milliseconds, I > suggest to allow specifying window sizes with a {{Duration}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size
[ https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187747#comment-17187747 ] Sören Henning edited comment on KAFKA-9649 at 8/31/20, 1:49 PM: Hi, sorry for the late response, vacation came up... :) In our case, we observed this issue when grouping a windowed KTable by a new key for a subsequent aggregation: {noformat} KTable, V> myTable = //... KGroupedTable, V> = myTable .groupBy( (k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(), k.window()), v), Grouped.with( new WindowedSerdes.TimeWindowedSerde<>( myTableXYAttributeSerde, myTableWindowSize), myTableValueSerde)); {noformat} Here, we have a windowed KTable with keys of type {{K}} and want to group it by a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}. When not passing {{myTableWindowSize}} to the {{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which actually are of type {{Windowed}}, are not assigned the correct end timestamps. This issue does not immediately becomes apparent, only a log message is produced: {noformat} WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window end time was truncated to Long.MAX {noformat} was (Author: soerenhenning): Hi, sorry for the late response, vacation came up... :) In our case, we observed this issue when grouping a windowed KTable by a new key for subsequent aggregation: {noformat} KTable, V> myTable = //... KGroupedTable, V> = myTable .groupBy( (k, v) -> KeyValue.pair(new Windowed<>(k.key().getXYAttribute(), k.window()), v), Grouped.with( new WindowedSerdes.TimeWindowedSerde<>( myTableXYAttributeSerde, myTableWindowSize), myTableValueSerde)); {noformat} Here, we have a windowed KTable with keys of type {{K}} and want to group it by a certain attribute ({{XY}}) of the key. This attribute is of type {{KNew}}. When not passing {{myTableWindowSize}} to the {{WindowedSerdes.TimeWindowedSerde}} constructor, the new KTable keys, which actually are of type {{Windowed}}, are not assigned the correct end timestamps. This issue does not immediately becomes apparent, only a log message is produced: {noformat} WARN org.apache.kafka.streams.state.internals.WindowKeySchema - Warning: window end time was truncated to Long.MAX {noformat} > Remove/Warn on use of TimeWindowedSerde with no specified window size > - > > Key: KAFKA-9649 > URL: https://issues.apache.org/jira/browse/KAFKA-9649 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sören Henning >Priority: Major > > The API of the > [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java] > promotes its construction without specifying a window size: > {noformat} > public TimeWindowedSerde(final Serde inner) > {noformat} > While code using this constructor looks absolutely clean, it leads to fatal > errors at runtime, which turned out to be very hard to discover. > The reason for these error can be found in the construction of the > [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java], > which is created via: > {noformat} > // TODO: fix this part as last bits of KAFKA-4468 > public TimeWindowedDeserializer(final Deserializer inner) { > this(inner, Long.MAX_VALUE); > } > {noformat} > The TODO comment suggests that this issue is (or at least was) already known. > We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} > constructor or at least to warn when using it (if required for backwards > compatiblity). The ideal solution of course would be to get the window size > from some externally provided context. However, I expect this to be difficult > to realize. Same applies also the {{TimeWindowedDeserializer(final > Deserializer inner)}} constructor. > A further minor suggestion in this context: As now most Kafka Streams time > declarations use {{Duration}} s instead of long-encoded milliseconds, I > suggest to allow specifying window sizes with a {{Duration}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeqo commented on pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
jeqo commented on pull request #9138: URL: https://github.com/apache/kafka/pull/9138#issuecomment-683830757 @ableegoldman key ordering is added to `InMemoryWindowStore` now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r480194412 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (TimestampedWindowStore) context.getStateStore(storeName); +tupleForwarder = new TimestampedTupleFo
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-683859366 @chia7712 : Thanks for the performance results. It seems that the average across multiple runs doesn't change much? Also, 1 failure in the latest system test run. http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-08-30--001.1598849124--chia7712--fix_8334_avoid_deadlock--960f19b29/report.html This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10448) Preserve Source Partition in Kafka Streams from context
[ https://issues.apache.org/jira/browse/KAFKA-10448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10448: Labels: needs-kip (was: ) > Preserve Source Partition in Kafka Streams from context > --- > > Key: KAFKA-10448 > URL: https://issues.apache.org/jira/browse/KAFKA-10448 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: satya >Priority: Minor > Labels: needs-kip > > Currently Kafka streams Sink Nodes use default partitioner or has the > provision of using a custom partitioner which has to be dependent on > key/value. I am looking for an enhancement of Sink Node to ensure source > partition is preserved instead of deriving the partition again using > key/value. One of our use case has producers which have custom partitioners > that we dont have access to as it is a third-party application. By simply > preserving the partition through context.partition() would be helpful. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-683863356 > It seems that the average across multiple runs doesn't change much? yep. I didn't observe obvious regression caused by this patch. > Also, 1 failure in the latest system test run. ```kafkatest.tests.connect.connect_distributed_test``` was flaky (see https://issues.apache.org/jira/browse/KAFKA-10289) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10448) Preserve Source Partition in Kafka Streams from context
[ https://issues.apache.org/jira/browse/KAFKA-10448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-10448: Priority: Minor (was: Critical) > Preserve Source Partition in Kafka Streams from context > --- > > Key: KAFKA-10448 > URL: https://issues.apache.org/jira/browse/KAFKA-10448 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: satya >Priority: Minor > > Currently Kafka streams Sink Nodes use default partitioner or has the > provision of using a custom partitioner which has to be dependent on > key/value. I am looking for an enhancement of Sink Node to ensure source > partition is preserved instead of deriving the partition again using > key/value. One of our use case has producers which have custom partitioners > that we dont have access to as it is a third-party application. By simply > preserving the partition through context.partition() would be helpful. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no
Jigar Naik created KAFKA-10450: -- Summary: console-producer throws Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no in-flight requests for node -1 Key: KAFKA-10450 URL: https://issues.apache.org/jira/browse/KAFKA-10450 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 2.6.0 Environment: Kafka Version 2.6.0 MacOS Version - macOS Catalina 10.15.6 (19G2021) java version "11.0.8" 2020-07-14 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode) Reporter: Jigar Naik Kafka-console-producer.sh gives below error on Mac ERROR [Producer clientId=console-producer] Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no in-flight requests for node -1 *Steps to re-produce the issue.* Download Kafka from [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz] Change data and log directory (Optional) Create Topic Using below command {code:java} ./kafka-topics.sh \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic my-topic{code} Start Kafka console producer using below command {code:java} ./kafka-console-consumer.sh \ --topic my-topic \ --from-beginning \ --bootstrap-server localhost:9092{code} Gives below output {code:java} ./kafka-console-producer.sh \ --topic my-topic \ --bootstrap-server 127.0.0.1:9092 >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] Uncaught >error in kafka producer I/O thread: >(org.apache.kafka.clients.producer.internals.Sender) java.nio.BufferUnderflowException at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650) at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391) at org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43) at org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102) at org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70) at org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.base/java.lang.Thread.run(Thread.java:834) [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no in-flight requests for node -1 at org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62) at org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.base/java.lang.Thread.run(Thread.java:834) [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) {code} The same steps works fine with Kafka version 2.0.0 on Mac. The same steps works fine with Kafka version 2.6.0 on Windows. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no
[ https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jigar Naik updated KAFKA-10450: --- Priority: Blocker (was: Major) > console-producer throws Uncaught error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > --- > > Key: KAFKA-10450 > URL: https://issues.apache.org/jira/browse/KAFKA-10450 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: Kafka Version 2.6.0 > MacOS Version - macOS Catalina 10.15.6 (19G2021) > java version "11.0.8" 2020-07-14 LTS > Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) > Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode) >Reporter: Jigar Naik >Priority: Blocker > > Kafka-console-producer.sh gives below error on Mac > ERROR [Producer clientId=console-producer] Uncaught error in kafka producer > I/O thread: (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > *Steps to re-produce the issue.* > Download Kafka from > [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz] > > Change data and log directory (Optional) > Create Topic Using below command > > {code:java} > ./kafka-topics.sh \ > --create \ > --zookeeper localhost:2181 \ > --replication-factor 1 \ > --partitions 1 \ > --topic my-topic{code} > > Start Kafka console producer using below command > > {code:java} > ./kafka-console-consumer.sh \ > --topic my-topic \ > --from-beginning \ > --bootstrap-server localhost:9092{code} > > Gives below output > > {code:java} > ./kafka-console-producer.sh \ > --topic my-topic \ > --bootstrap-server 127.0.0.1:9092 > >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] > >Uncaught error in kafka producer I/O thread: > >(org.apache.kafka.clients.producer.internals.Sender) > java.nio.BufferUnderflowException > at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650) > at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391) > at > org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43) > at > org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102) > at > org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70) > at > org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66) > at > org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught > error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > at > org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62) > at > org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap > broker 127.0.0.1:9092 (id: -1 rack: null) disconnected > (org.apache.kafka.clients.NetworkClient) > {code} > > > The same steps works fine with Kafka version 2.0.0 on Mac. > The same steps works fine with Kafka version 2.6.0 on Windows. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no
[ https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jigar Naik updated KAFKA-10450: --- Priority: Critical (was: Blocker) > console-producer throws Uncaught error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > --- > > Key: KAFKA-10450 > URL: https://issues.apache.org/jira/browse/KAFKA-10450 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: Kafka Version 2.6.0 > MacOS Version - macOS Catalina 10.15.6 (19G2021) > java version "11.0.8" 2020-07-14 LTS > Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) > Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode) >Reporter: Jigar Naik >Priority: Critical > > Kafka-console-producer.sh gives below error on Mac > ERROR [Producer clientId=console-producer] Uncaught error in kafka producer > I/O thread: (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > *Steps to re-produce the issue.* > Download Kafka from > [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz] > > Change data and log directory (Optional) > Create Topic Using below command > > {code:java} > ./kafka-topics.sh \ > --create \ > --zookeeper localhost:2181 \ > --replication-factor 1 \ > --partitions 1 \ > --topic my-topic{code} > > Start Kafka console producer using below command > > {code:java} > ./kafka-console-consumer.sh \ > --topic my-topic \ > --from-beginning \ > --bootstrap-server localhost:9092{code} > > Gives below output > > {code:java} > ./kafka-console-producer.sh \ > --topic my-topic \ > --bootstrap-server 127.0.0.1:9092 > >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] > >Uncaught error in kafka producer I/O thread: > >(org.apache.kafka.clients.producer.internals.Sender) > java.nio.BufferUnderflowException > at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650) > at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391) > at > org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43) > at > org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102) > at > org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70) > at > org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66) > at > org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught > error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > at > org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62) > at > org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap > broker 127.0.0.1:9092 (id: -1 rack: null) disconnected > (org.apache.kafka.clients.NetworkClient) > {code} > > > The same steps works fine with Kafka version 2.0.0 on Mac. > The same steps works fine with Kafka version 2.6.0 on Windows. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no
[ https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jigar Naik updated KAFKA-10450: --- Priority: Blocker (was: Critical) > console-producer throws Uncaught error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > --- > > Key: KAFKA-10450 > URL: https://issues.apache.org/jira/browse/KAFKA-10450 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: Kafka Version 2.6.0 > MacOS Version - macOS Catalina 10.15.6 (19G2021) > java version "11.0.8" 2020-07-14 LTS > Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) > Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode) >Reporter: Jigar Naik >Priority: Blocker > > Kafka-console-producer.sh gives below error on Mac > ERROR [Producer clientId=console-producer] Uncaught error in kafka producer > I/O thread: (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > *Steps to re-produce the issue.* > Download Kafka from > [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz] > > Change data and log directory (Optional) > Create Topic Using below command > > {code:java} > ./kafka-topics.sh \ > --create \ > --zookeeper localhost:2181 \ > --replication-factor 1 \ > --partitions 1 \ > --topic my-topic{code} > > Start Kafka console producer using below command > > {code:java} > ./kafka-console-consumer.sh \ > --topic my-topic \ > --from-beginning \ > --bootstrap-server localhost:9092{code} > > Gives below output > > {code:java} > ./kafka-console-producer.sh \ > --topic my-topic \ > --bootstrap-server 127.0.0.1:9092 > >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] > >Uncaught error in kafka producer I/O thread: > >(org.apache.kafka.clients.producer.internals.Sender) > java.nio.BufferUnderflowException > at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650) > at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391) > at > org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43) > at > org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102) > at > org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70) > at > org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66) > at > org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught > error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > at > org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62) > at > org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap > broker 127.0.0.1:9092 (id: -1 rack: null) disconnected > (org.apache.kafka.clients.NetworkClient) > {code} > > > The same steps works fine with Kafka version 2.0.0 on Mac. > The same steps works fine with Kafka version 2.6.0 on Windows. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r480265451 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -367,14 +406,37 @@ public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedExcept time.sleep(5000); // create a consumer at backup cluster with same consumer group Id to consume old and new topic -consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2"); +consumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "primary.test-topic-1", "primary.test-topic-2"); records = consumer.poll(Duration.ofMillis(500)); // similar reasoning as above, no more records to consume by the same consumer group at backup cluster assertEquals("consumer record size is not zero", 0, records.count()); consumer.close(); +} + +private void produceMessages(EmbeddedConnectCluster cluster, String topicName, int partitions, String msgPrefix) { +for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { +// produce to all partitions but the last one Review comment: Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r480266459 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -128,10 +136,23 @@ public void setup() throws InterruptedException { backup.kafka().createTopic("primary.test-topic-1", 1); backup.kafka().createTopic("heartbeats", 1); -for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { -primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i); -backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i); -} +// produce to all partitions but the last one Review comment: I've separated the test from the existing ones, also using a different topic. Some of the logic on those tests is complex and may be hard to follow so I thought it would be better to have the tests totally separate and simpler to interpret. I think, and hope, it's easier to understand now than it was before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r480267122 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -128,10 +136,23 @@ public void setup() throws InterruptedException { backup.kafka().createTopic("primary.test-topic-1", 1); backup.kafka().createTopic("heartbeats", 1); -for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { -primary.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-1-" + i); -backup.kafka().produce("test-topic-1", i % NUM_PARTITIONS, "key", "message-2-" + i); -} +// produce to all partitions but the last one +produceMessages(primary, "test-topic-1", NUM_PARTITIONS - 1, "message-1-"); +produceMessages(backup, "test-topic-1", NUM_PARTITIONS - 1, "message-2-"); + +consumerProps = new HashMap() {{ Review comment: I changed the way this is done, setting it at the test-level, with test-specific CGs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r480267347 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -190,24 +211,19 @@ public void close() { public void testReplication() throws InterruptedException { // create consumers before starting the connectors so we don't need to wait for discovery -Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"group.id", "consumer-group-1"), "test-topic-1", "backup.test-topic-1"); +Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", "backup.test-topic-1"); consumer1.poll(Duration.ofMillis(500)); consumer1.commitSync(); consumer1.close(); -Consumer consumer2 = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( -"group.id", "consumer-group-1"), "test-topic-1", "primary.test-topic-1"); +Consumer consumer2 = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1", "primary.test-topic-1"); Review comment: Good catch. Also changed this and we're now consuming it only once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] asdaraujo commented on pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on pull request #8730: URL: https://github.com/apache/kafka/pull/8730#issuecomment-683909598 @mimaison Thanks for the feedback. I've refactored the tests. Could you please give it another review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10451) system tests send large command over ssh instead of using remote file for security config
Ron Dagostino created KAFKA-10451: - Summary: system tests send large command over ssh instead of using remote file for security config Key: KAFKA-10451 URL: https://issues.apache.org/jira/browse/KAFKA-10451 Project: Kafka Issue Type: Improvement Components: system tests Reporter: Ron Dagostino In `kafka.py` the pattern used to supply security configuration information to remote CLI tools is to send the information as part of the ssh command. For example, see this --command-config definition: {{Running ssh command: export KAFKA_OPTS="-Djava.security.auth.login.config=/mnt/security/admin_client_as_broker_jaas.conf -Djava.security.krb5.conf=/mnt/security/krb5.conf"; /opt/kafka-dev/bin/kafka-configs.sh --bootstrap-server worker2:9095 --command-config <(echo ' ssl.endpoint.identification.algorithm=HTTPS sasl.kerberos.service.name=kafka security.protocol=SASL_SSL ssl.keystore.location=/mnt/security/test.keystore.jks ssl.truststore.location=/mnt/security/test.truststore.jks ssl.keystore.password=test-ks-passwd sasl.mechanism=SCRAM-SHA-256 ssl.truststore.password=test-ts-passwd ssl.key.password=test-ks-passwd sasl.mechanism.inter.broker.protocol=GSSAPI ') --entity-name kafka-client --entity-type users --alter --add-config SCRAM-SHA-256=[password=client-secret]}} This ssh command length is getting pretty big. It would be best if this referred to a file as opposed to sending in the file contents as part of the ssh command. This happens in a few places in `kafka/py` and should be rectified. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480272048 ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -575,10 +577,21 @@ def set_unclean_leader_election(self, topic, value=True, node=None): node.account.ssh(cmd) def _connect_setting_kafka_configs(self, node): +# Use this for everything related to kafka-configs except User SCRAM Credentials if node.version.kafka_configs_command_uses_bootstrap_server(): -return "--bootstrap-server %s " % self.bootstrap_servers(self.security_protocol) +return "--bootstrap-server %s --command-config <(echo '%s')" % (self.bootstrap_servers(self.security_protocol), Review comment: I created https://issues.apache.org/jira/browse/KAFKA-10451 to track this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
ableegoldman commented on pull request #9039: URL: https://github.com/apache/kafka/pull/9039#issuecomment-683916587 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
ableegoldman commented on pull request #9039: URL: https://github.com/apache/kafka/pull/9039#issuecomment-683917229 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480282139 ## File path: core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ## @@ -169,6 +169,18 @@ object JaasTestUtils { jaasFile } + // Returns a SASL/SCRAM configuration using credentials for the given user and password + def scramClientLoginModule(mechanism: String, scramUser: String, scramPassword: String): String = { +mechanism match { + case "SCRAM-SHA-256" | "SCRAM-SHA-512" => Review comment: I fixed this in the other places in this file as well This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480301973 ## File path: core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala ## @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import java.io.{ByteArrayOutputStream, PrintStream} +import java.nio.charset.StandardCharsets + +import kafka.server.BaseRequestTest +import kafka.utils.Exit +import org.junit.Assert._ +import org.junit.Test + +class UserScramCredentialsCommandTest extends BaseRequestTest { + override def brokerCount = 1 + var exitStatus: Option[Int] = None + var exitMessage: Option[String] = None + + case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = None) + + private def runConfigCommandViaBroker(args: Array[String]) : ConfigCommandResult = { +val byteArrayOutputStream = new ByteArrayOutputStream() +val utf8 = StandardCharsets.UTF_8.name +val printStream = new PrintStream(byteArrayOutputStream, true, utf8) +var exitStatus: Option[Int] = None +Exit.setExitProcedure { (status, _) => + exitStatus = Some(status) + throw new RuntimeException +} +try { + Console.withOut(printStream) { +ConfigCommand.main(Array("--bootstrap-server", brokerList) ++ args) + } + ConfigCommandResult(byteArrayOutputStream.toString(utf8)) +} catch { + case e: Exception => { Review comment: Logging it at debug level doesn't hurt, so I added it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size
[ https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9649: --- Labels: kip (was: ) > Remove/Warn on use of TimeWindowedSerde with no specified window size > - > > Key: KAFKA-9649 > URL: https://issues.apache.org/jira/browse/KAFKA-9649 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sören Henning >Priority: Major > Labels: kip > > The API of the > [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java] > promotes its construction without specifying a window size: > {noformat} > public TimeWindowedSerde(final Serde inner) > {noformat} > While code using this constructor looks absolutely clean, it leads to fatal > errors at runtime, which turned out to be very hard to discover. > The reason for these error can be found in the construction of the > [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java], > which is created via: > {noformat} > // TODO: fix this part as last bits of KAFKA-4468 > public TimeWindowedDeserializer(final Deserializer inner) { > this(inner, Long.MAX_VALUE); > } > {noformat} > The TODO comment suggests that this issue is (or at least was) already known. > We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} > constructor or at least to warn when using it (if required for backwards > compatiblity). The ideal solution of course would be to get the window size > from some externally provided context. However, I expect this to be difficult > to realize. Same applies also the {{TimeWindowedDeserializer(final > Deserializer inner)}} constructor. > A further minor suggestion in this context: As now most Kafka Streams time > declarations use {{Duration}} s instead of long-encoded milliseconds, I > suggest to allow specifying window sizes with a {{Duration}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9649) Remove/Warn on use of TimeWindowedSerde with no specified window size
[ https://issues.apache.org/jira/browse/KAFKA-9649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-9649: -- Assignee: Leah Thomas > Remove/Warn on use of TimeWindowedSerde with no specified window size > - > > Key: KAFKA-9649 > URL: https://issues.apache.org/jira/browse/KAFKA-9649 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sören Henning >Assignee: Leah Thomas >Priority: Major > Labels: kip > > The API of the > [{{TimeWindowedSerde}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java] > promotes its construction without specifying a window size: > {noformat} > public TimeWindowedSerde(final Serde inner) > {noformat} > While code using this constructor looks absolutely clean, it leads to fatal > errors at runtime, which turned out to be very hard to discover. > The reason for these error can be found in the construction of the > [{{TimeWindowedDeserializer}}|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java], > which is created via: > {noformat} > // TODO: fix this part as last bits of KAFKA-4468 > public TimeWindowedDeserializer(final Deserializer inner) { > this(inner, Long.MAX_VALUE); > } > {noformat} > The TODO comment suggests that this issue is (or at least was) already known. > We suggest to either remove the {{TimeWindowedSerde(final Serde inner)}} > constructor or at least to warn when using it (if required for backwards > compatiblity). The ideal solution of course would be to get the window size > from some externally provided context. However, I expect this to be difficult > to realize. Same applies also the {{TimeWindowedDeserializer(final > Deserializer inner)}} constructor. > A further minor suggestion in this context: As now most Kafka Streams time > declarations use {{Duration}} s instead of long-encoded milliseconds, I > suggest to allow specifying window sizes with a {{Duration}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480324560 ## File path: core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala ## @@ -1047,8 +1047,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @Test def testAddRemoveSaslListeners(): Unit = { -createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) -createScramCredentials(zkConnect, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) +createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) +createScramCredentials(adminClients.head, JaasTestUtils.KafkaScramAdmin, JaasTestUtils.KafkaScramAdminPassword) initializeKerberos() Review comment: Good point. It wasn't waiting before, and it probably didn't/doesn't matter since we were spending time initializing Kerberos, but I added the check anyway just to be safe. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480329570 ## File path: core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala ## @@ -42,7 +42,18 @@ class SaslScramSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTes override def setUp(): Unit = { super.setUp() // Create client credentials after starting brokers so that dynamic credential creation is also tested -createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) -createScramCredentials(zkConnect, JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2) +createScramCredentialWithScramAdminClient(JaasTestUtils.KafkaScramUser, JaasTestUtils.KafkaScramPassword) +createScramCredentialWithScramAdminClient(JaasTestUtils.KafkaScramUser2, JaasTestUtils.KafkaScramPassword2) + } + + private def createScramCredentialWithScramAdminClient(user: String, password: String) = { Review comment: It was a goal to eliminate all SCRAM credential creation via ZooKeeper where possible. The only places that do so after this PR are when credentials have to be created before the brokers are started (i.e. when the inter-broker security protocol is SASL/SCRAM). This code used to create the credential directly via ZooKeeper, but since it occurs after the brokers start it can use the admin client. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10384) Separate converters from generated messages
[ https://issues.apache.org/jira/browse/KAFKA-10384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-10384. -- Fix Version/s: 2.7.0 Resolution: Fixed > Separate converters from generated messages > --- > > Key: KAFKA-10384 > URL: https://issues.apache.org/jira/browse/KAFKA-10384 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 2.7.0 > > > Separate the JSON converter classes from the message classes, so that the > clients module can be used without Jackson on the CLASSPATH. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10434) Remove deprecated methods on WindowStore
[ https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10434: - Fix Version/s: 3.0.0 > Remove deprecated methods on WindowStore > > > Key: KAFKA-10434 > URL: https://issues.apache.org/jira/browse/KAFKA-10434 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and > [https://github.com/apache/kafka/pull/9138#discussion_r474995606] : > WindowStore contains ReadOnlyWindowStore methods. > We could consider: > * Moving read methods from WindowStore to ReadOnlyWindowStore and/or > * Consider removing long based methods -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10434) Remove deprecated methods on WindowStore
[ https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10434: - Priority: Blocker (was: Major) > Remove deprecated methods on WindowStore > > > Key: KAFKA-10434 > URL: https://issues.apache.org/jira/browse/KAFKA-10434 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Priority: Blocker > Labels: needs-kip > > From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and > [https://github.com/apache/kafka/pull/9138#discussion_r474995606] : > WindowStore contains ReadOnlyWindowStore methods. > We could consider: > * Moving read methods from WindowStore to ReadOnlyWindowStore and/or > * Consider removing long based methods -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10445) Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore
[ https://issues.apache.org/jira/browse/KAFKA-10445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-10445: Assignee: Jorge Esteban Quilcate Otoya > Align IQ SessionStore API with Instant-based methods as ReadOnlyWindowStore > --- > > Key: KAFKA-10445 > URL: https://issues.apache.org/jira/browse/KAFKA-10445 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: beginner, needs-kip, newbie > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10434) Remove deprecated methods on WindowStore
[ https://issues.apache.org/jira/browse/KAFKA-10434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187962#comment-17187962 ] John Roesler commented on KAFKA-10434: -- Marked as a 3.0 blocker so we will be sure to consider removing the methods at the time of the 3.0 release. If we decide at that time not to do it yet, we'll just move it to the 4.0 release. > Remove deprecated methods on WindowStore > > > Key: KAFKA-10434 > URL: https://issues.apache.org/jira/browse/KAFKA-10434 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > From [https://github.com/apache/kafka/pull/9138#discussion_r474985997] and > [https://github.com/apache/kafka/pull/9138#discussion_r474995606] : > WindowStore contains ReadOnlyWindowStore methods. > We could consider: > * Moving read methods from WindowStore to ReadOnlyWindowStore and/or > * Consider removing long based methods -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10417) suppress() with cogroup() throws ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10417: - Fix Version/s: 2.6.1 > suppress() with cogroup() throws ClassCastException > --- > > Key: KAFKA-10417 > URL: https://issues.apache.org/jira/browse/KAFKA-10417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Wardha Perinkada Kattu >Priority: Blocker > Labels: kafka-streams > Fix For: 2.7.0, 2.6.1 > > > Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` > throws `ClassCastException` > Works fine without the `suppress()` > Code block tested - > {code:java} > val stream1 = requestStreams.merge(successStreams).merge(errorStreams) > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.notificationSerde())) > val streams2 = confirmationStreams > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.confirmationsSerde())) > val cogrouped = > stream1.cogroup(notificationAggregator).cogroup(streams2, > confirmationsAggregator) > > .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong( > .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store") > > .withValueSerde(serdesConfig.notificationMetricSerde())) > .suppress(Suppressed.untilWindowCloses(unbounded())) > .toStream() > {code} > Exception thrown is: > {code:java} > Caused by: java.lang.ClassCastException: class > org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to > class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier > (org.apache.kafka.streams.kstream.internals.PassThrough and > org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in > unnamed module of loader 'app') > {code} > [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480370634 ## File path: core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ## @@ -545,6 +558,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } } + protected def createScramAdminClient(scramMechanism: String, user: String, password: String): Admin = { Review comment: See above This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480370463 ## File path: core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala ## @@ -248,4 +250,25 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") createProducer() } + + private def createScramAdminClient(user: String, password: String): Admin = { Review comment: Ok, I added the following code to SaslSetup, and we implement that first method in the 3 test classes that use this functionality. ``` def createPrivilegedAdminClient(): Admin = { // create an admin client instance that is authorized to create credentials throw new UnsupportedOperationException("Must implement this if a test needs to use it") } def createScramCredentialsViaPrivilegedAdminClient(userName: String, password: String): Unit = { val privilegedAdminClient = createPrivilegedAdminClient() // must explicitly implement this method try { // create the SCRAM credential for the given user createScramCredentials(privilegedAdminClient, userName, password) } finally { privilegedAdminClient.close() } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10429) Group Coordinator unavailability leads to missing events
[ https://issues.apache.org/jira/browse/KAFKA-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187992#comment-17187992 ] John Roesler commented on KAFKA-10429: -- Hi Navinder, My first thought is that version 1.1.1 is extremely old, and a lot has actually changed in the consumers since then. Is there any chance you can try with a newer version of Streams and see if you still observe the issue? Aside from that, from the logs you posted, it looks like in only took that instance a few seconds to re-acquire the connection to the coordinator, but the next paragraph implies that disconnections have lasted hours. Can you clarify? A few other notes: * Disconnecting from the coordinator shouldn't interrupt processing, since you can still fetch from the leader and followers of the topic partitions you're assigned * If an instance is disconnected for longer than the session interval, you would actually see rebalances caused by that interval having dropped out of the group * If the log cleaner removes some offsets after the consumer's current position, there would be an InvalidOffsetException (unless there's an auto-reset policy configured), so you wouldn't silently miss data > Group Coordinator unavailability leads to missing events > > > Key: KAFKA-10429 > URL: https://issues.apache.org/jira/browse/KAFKA-10429 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1 >Reporter: Navinder Brar >Priority: Major > > We are regularly getting this Exception in logs. > [2020-08-25 03:24:59,214] INFO [Consumer > clientId=appId-StreamThread-1-consumer, groupId=dashavatara] Group > coordinator ip:9092 (id: 1452096777 rack: null) is *unavailable* or invalid, > will attempt rediscovery > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > > And after sometime it becomes discoverable: > [2020-08-25 03:25:02,218] INFO [Consumer > clientId=appId-c3d1d186-e487-4993-ae3d-5fed75887e6b-StreamThread-1-consumer, > groupId=appId] Discovered group coordinator ip:9092 (id: 1452096777 rack: > null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > > Now, the doubt I have is why this unavailability doesn't trigger a rebalance > in the cluster. We have few hours of retention on the source Kafka Topics and > sometimes this unavailability stays over for more than few hours and since it > doesn't trigger a rebalance or stops processing on other nodes(which are > connected to GC) we never come to know that some issue has happened and till > then we lose events from our source topics. > > There are some resolutions mentioned on stackoverflow but those configs are > already set in our kafka: > default.replication.factor=3 > offsets.topic.replication.factor=3 > > It would be great to understand why this issue is happening and why it > doesn't trigger a rebalance and is there any known solution for it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10452) Only expire preferred read replica if a leader is alive for the topic
Jeff Kim created KAFKA-10452: Summary: Only expire preferred read replica if a leader is alive for the topic Key: KAFKA-10452 URL: https://issues.apache.org/jira/browse/KAFKA-10452 Project: Kafka Issue Type: Bug Components: clients Reporter: Jeff Kim Assignee: Jeff Kim Fetch from follower functionality periodically expires and refreshes preferred read replica (at `metadata.max.age.ms` interval). This allows a client to discover a better follower to fetch from if one becomes available. However the expiration is done even if the current partition has no leader (can happen in DR scenario with observers). It makes sense to get the new preferred replica information and update existing one, instead of expiring existing one and then fetching new one. Doing this will allow clients to keep on fetching from a follower/observer instead of failing to find leader when all ISR replicas go offline. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing
ijuma commented on a change in pull request #9226: URL: https://github.com/apache/kafka/pull/9226#discussion_r480364696 ## File path: Jenkinsfile ## @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +def setupGradle() { + // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167 + dir('.gradle') { +deleteDir() + } + sh './gradlew -version' Review comment: Is this informational so that we know the gradle version? Or is there more to it? ## File path: Jenkinsfile ## @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +def setupGradle() { + // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167 + dir('.gradle') { +deleteDir() + } + sh './gradlew -version' +} + +def doValidation() { + try { +sh ''' + ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala compileTestJava compileTestScala \ + spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \ + --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\" +''' + } catch(err) { +error('Validation checks failed, aborting this build') Review comment: Should we include `err` in the output somehow? Or is that not needed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing
mumrah commented on a change in pull request #9226: URL: https://github.com/apache/kafka/pull/9226#discussion_r480388916 ## File path: Jenkinsfile ## @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +def setupGradle() { + // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167 + dir('.gradle') { +deleteDir() + } + sh './gradlew -version' Review comment: Yes informational to see the gradle version and jdk that's running This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing
mumrah commented on a change in pull request #9226: URL: https://github.com/apache/kafka/pull/9226#discussion_r480391239 ## File path: Jenkinsfile ## @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +def setupGradle() { + // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167 + dir('.gradle') { +deleteDir() + } + sh './gradlew -version' +} + +def doValidation() { + try { +sh ''' + ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala compileTestJava compileTestScala \ + spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \ + --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\" +''' + } catch(err) { +error('Validation checks failed, aborting this build') Review comment: I think the err here will be like an ExectuionException -- i'm going to try something different than this try/catch actually This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing
lbradstreet commented on a change in pull request #9226: URL: https://github.com/apache/kafka/pull/9226#discussion_r480392178 ## File path: Jenkinsfile ## @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +def setupGradle() { + // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167 + dir('.gradle') { +deleteDir() + } + sh './gradlew -version' +} + +def doValidation() { + try { +sh ''' + ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala compileTestJava compileTestScala \ + spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \ + --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\" +''' + } catch(err) { +error('Validation checks failed, aborting this build') + } +} + +def doTest() { + try { +sh ''' + ./gradlew -PscalaVersion=$SCALA_VERSION unitTest integrationTest \ + --profile --no-daemon --continue -PtestLoggingEvents=started,passed,skipped,failed "$@" +''' + } catch(err) { +echo 'Some tests failed, marking this build UNSTABLE' +currentBuild.result = 'UNSTABLE' Review comment: I think this may mark a build as UNSTABLE if one of gradle's executors exited. We're trying to achieve the property that the build won't fail completely if one of the tests fails, right? If so, I think the best way is to supply `-PignoreFailures=true` at https://github.com/apache/kafka/pull/9226/files#diff-58231b16fdee45a03a4ee3cf94a9f2c3R44, and add: ``` task integrationTest(type: Test, dependsOn: compileJava) { ignoreFailures = userIgnoreFailures ``` ``` task unitTest(type: Test, dependsOn: compileJava) { ignoreFailures = userIgnoreFailures ``` ``` ext { userIgnoreFailures = project.hasProperty('ignoreFailures') ? ignoreFailures : false ``` to build.gradle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10452) Only expire preferred read replica if a leader is alive for the topic
[ https://issues.apache.org/jira/browse/KAFKA-10452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Kim updated KAFKA-10452: - Description: Fetch from follower functionality periodically expires and refreshes preferred read replica (at `metadata.max.age.ms` interval). This allows a client to discover a better follower to fetch from if one becomes available. However the expiration is done even if the current partition has no leader. It makes sense to get the new preferred replica information and update existing one, instead of expiring existing one and then fetching new one. Doing this will allow clients to keep on fetching from a follower instead of failing to find leader when all ISR replicas go offline. was: Fetch from follower functionality periodically expires and refreshes preferred read replica (at `metadata.max.age.ms` interval). This allows a client to discover a better follower to fetch from if one becomes available. However the expiration is done even if the current partition has no leader (can happen in DR scenario with observers). It makes sense to get the new preferred replica information and update existing one, instead of expiring existing one and then fetching new one. Doing this will allow clients to keep on fetching from a follower/observer instead of failing to find leader when all ISR replicas go offline. > Only expire preferred read replica if a leader is alive for the topic > - > > Key: KAFKA-10452 > URL: https://issues.apache.org/jira/browse/KAFKA-10452 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Major > > Fetch from follower functionality periodically expires and refreshes > preferred read replica (at `metadata.max.age.ms` interval). This allows a > client to discover a better follower to fetch from if one becomes available. > However the expiration is done even if the current partition has no leader. > It makes sense to get the new preferred replica information and update > existing one, instead of expiring existing one and then fetching new one. > Doing this will allow clients to keep on fetching from a follower instead of > failing to find leader when all ISR replicas go offline. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing
mumrah commented on a change in pull request #9226: URL: https://github.com/apache/kafka/pull/9226#discussion_r480393840 ## File path: Jenkinsfile ## @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +def setupGradle() { + // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167 + dir('.gradle') { +deleteDir() + } + sh './gradlew -version' +} + +def doValidation() { + try { +sh ''' + ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala compileTestJava compileTestScala \ + spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \ + --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\" +''' + } catch(err) { +error('Validation checks failed, aborting this build') + } +} + +def doTest() { + try { +sh ''' + ./gradlew -PscalaVersion=$SCALA_VERSION unitTest integrationTest \ + --profile --no-daemon --continue -PtestLoggingEvents=started,passed,skipped,failed "$@" +''' + } catch(err) { +echo 'Some tests failed, marking this build UNSTABLE' +currentBuild.result = 'UNSTABLE' Review comment: I see, so in this case we'll let the `junit` directive set the build to unstable? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9234: MINOR: Record all poll invocations
vvcephei opened a new pull request #9234: URL: https://github.com/apache/kafka/pull/9234 Record the `pollSensor` after every invocation to poll, rather than just when we get records back so that we can accurately gauge how often we're invoking Consumer#poll. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9234: MINOR: Record all poll invocations
vvcephei commented on pull request #9234: URL: https://github.com/apache/kafka/pull/9234#issuecomment-684038074 Hey @guozhangwang and @cadonna , I was just looking into key metrics for monitoring, and it seems like this metric is misplaced. Unlike the `pollRecordsSensor`, it seems like the `pollSensor` should be recorded unconditionally. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing
lbradstreet commented on a change in pull request #9226: URL: https://github.com/apache/kafka/pull/9226#discussion_r480404009 ## File path: Jenkinsfile ## @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +def setupGradle() { + // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167 + dir('.gradle') { +deleteDir() + } + sh './gradlew -version' +} + +def doValidation() { + try { +sh ''' + ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala compileTestJava compileTestScala \ + spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \ + --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\" +''' + } catch(err) { +error('Validation checks failed, aborting this build') + } +} + +def doTest() { + try { +sh ''' + ./gradlew -PscalaVersion=$SCALA_VERSION unitTest integrationTest \ + --profile --no-daemon --continue -PtestLoggingEvents=started,passed,skipped,failed "$@" +''' + } catch(err) { +echo 'Some tests failed, marking this build UNSTABLE' +currentBuild.result = 'UNSTABLE' Review comment: Correct, it'll be picked up when the test output files are picked up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r480407445 ## File path: core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala ## @@ -545,6 +558,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } } + protected def createScramAdminClient(scramMechanism: String, user: String, password: String): Admin = { Review comment: Ok, this now invokes a new method on SaslSetup(). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10453) Backport of PR-7781
Niketh Sabbineni created KAFKA-10453: Summary: Backport of PR-7781 Key: KAFKA-10453 URL: https://issues.apache.org/jira/browse/KAFKA-10453 Project: Kafka Issue Type: Wish Components: clients Affects Versions: 1.1.1 Reporter: Niketh Sabbineni We have been hitting this bug (with kafka 1.1.1) where the Producer takes forever to load metadata. The issue seems to have been patched in master [here|[https://github.com/apache/kafka/pull/7781]]. Would you *recommend* a backport of that above change to 1.1? There are 7-8 changes that need to be cherry picked. The other option is to upgrade to 2.5 (which would be much more involved) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10453) Backport of PR-7781
[ https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188017#comment-17188017 ] Ismael Juma commented on KAFKA-10453: - Can you help us understand why would it be much more involved to upgrade to 2.5? Our aim is to make it very easy to upgrade to newer versions of clients. > Backport of PR-7781 > --- > > Key: KAFKA-10453 > URL: https://issues.apache.org/jira/browse/KAFKA-10453 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 1.1.1 >Reporter: Niketh Sabbineni >Priority: Major > > We have been hitting this bug (with kafka 1.1.1) where the Producer takes > forever to load metadata. The issue seems to have been patched in master > [here|[https://github.com/apache/kafka/pull/7781]]. > Would you *recommend* a backport of that above change to 1.1? There are 7-8 > changes that need to be cherry picked. The other option is to upgrade to 2.5 > (which would be much more involved) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10453) Backport of PR-7781
[ https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188020#comment-17188020 ] Niketh Sabbineni commented on KAFKA-10453: -- We are running on an older version of ZK. We need to upgrade to 3.x (3.4) to be compatible with kafka 2.5. I am trying to see if we can solve the production issues that we are seeing with Kafka 1.1.1 by backporting (or cherry picking if necessary in our fork) instead of a full upgrade spanning multiple components. > Backport of PR-7781 > --- > > Key: KAFKA-10453 > URL: https://issues.apache.org/jira/browse/KAFKA-10453 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 1.1.1 >Reporter: Niketh Sabbineni >Priority: Major > > We have been hitting this bug (with kafka 1.1.1) where the Producer takes > forever to load metadata. The issue seems to have been patched in master > [here|[https://github.com/apache/kafka/pull/7781]]. > Would you *recommend* a backport of that above change to 1.1? There are 7-8 > changes that need to be cherry picked. The other option is to upgrade to 2.5 > (which would be much more involved) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10453) Backport of PR-7781
[ https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188021#comment-17188021 ] Niketh Sabbineni commented on KAFKA-10453: -- IMO |[https://github.com/apache/kafka/pull/7781] is a good fix to have in older releases too if we can easily backport. > Backport of PR-7781 > --- > > Key: KAFKA-10453 > URL: https://issues.apache.org/jira/browse/KAFKA-10453 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 1.1.1 >Reporter: Niketh Sabbineni >Priority: Major > > We have been hitting this bug (with kafka 1.1.1) where the Producer takes > forever to load metadata. The issue seems to have been patched in master > [here|[https://github.com/apache/kafka/pull/7781]]. > Would you *recommend* a backport of that above change to 1.1? There are 7-8 > changes that need to be cherry picked. The other option is to upgrade to 2.5 > (which would be much more involved) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tinawenqiao opened a new pull request #9235: KAFKA-10449: Add some important parameter description in connect-distributed.prope…
tinawenqiao opened a new pull request #9235: URL: https://github.com/apache/kafka/pull/9235 …rties. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tinawenqiao commented on pull request #9235: KAFKA-10449: Add some important parameter desc in connect-distributed.properties
tinawenqiao commented on pull request #9235: URL: https://github.com/apache/kafka/pull/9235#issuecomment-684064832 In WokerConfig.java we found that REST_HOST_NAME_CONFIG(rest.host.name) and REST_PORT_CONFIG(rest.port) were deprecated. And some new configuration parameters are introduced such as LISTENERS_CONFIG(listeners), REST_ADVERTISED_LISTENER_CONFIG(rest.advertised.listener),ADMIN_LISTENERS_CONFIG(admin.listeners) but not list in the sample conf file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10453) Backport of PR-7781
[ https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188024#comment-17188024 ] Ismael Juma commented on KAFKA-10453: - Kafka clients have no ZK dependency, so you can upgrade them without changing anything else. > Backport of PR-7781 > --- > > Key: KAFKA-10453 > URL: https://issues.apache.org/jira/browse/KAFKA-10453 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 1.1.1 >Reporter: Niketh Sabbineni >Priority: Major > > We have been hitting this bug (with kafka 1.1.1) where the Producer takes > forever to load metadata. The issue seems to have been patched in master > [here|[https://github.com/apache/kafka/pull/7781]]. > Would you *recommend* a backport of that above change to 1.1? There are 7-8 > changes that need to be cherry picked. The other option is to upgrade to 2.5 > (which would be much more involved) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10453) Backport of PR-7781
[ https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188028#comment-17188028 ] Niketh Sabbineni commented on KAFKA-10453: -- [~ijuma] Thanks for your reply. Do you happen to know if kafka 2.5 client is compatible with kafka 1.1.1 core/server? I should have checked that before opening the issue. > Backport of PR-7781 > --- > > Key: KAFKA-10453 > URL: https://issues.apache.org/jira/browse/KAFKA-10453 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 1.1.1 >Reporter: Niketh Sabbineni >Priority: Major > > We have been hitting this bug (with kafka 1.1.1) where the Producer takes > forever to load metadata. The issue seems to have been patched in master > [here|[https://github.com/apache/kafka/pull/7781]]. > Would you *recommend* a backport of that above change to 1.1? There are 7-8 > changes that need to be cherry picked. The other option is to upgrade to 2.5 > (which would be much more involved) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10453) Backport of PR-7781
[ https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188028#comment-17188028 ] Niketh Sabbineni edited comment on KAFKA-10453 at 8/31/20, 10:07 PM: - [~ijuma] Thanks for your reply. Do you happen to know if kafka 2.5 client is compatible with kafka 1.1.1 core/server? I should have checked that before opening the issue. I am digging to check that. was (Author: niketh): [~ijuma] Thanks for your reply. Do you happen to know if kafka 2.5 client is compatible with kafka 1.1.1 core/server? I should have checked that before opening the issue. > Backport of PR-7781 > --- > > Key: KAFKA-10453 > URL: https://issues.apache.org/jira/browse/KAFKA-10453 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 1.1.1 >Reporter: Niketh Sabbineni >Priority: Major > > We have been hitting this bug (with kafka 1.1.1) where the Producer takes > forever to load metadata. The issue seems to have been patched in master > [here|[https://github.com/apache/kafka/pull/7781]]. > Would you *recommend* a backport of that above change to 1.1? There are 7-8 > changes that need to be cherry picked. The other option is to upgrade to 2.5 > (which would be much more involved) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10453) Backport of PR-7781
[ https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188031#comment-17188031 ] Niketh Sabbineni commented on KAFKA-10453: -- Thank you! Let me test that out then. Thanks for your help. > Backport of PR-7781 > --- > > Key: KAFKA-10453 > URL: https://issues.apache.org/jira/browse/KAFKA-10453 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 1.1.1 >Reporter: Niketh Sabbineni >Priority: Major > > We have been hitting this bug (with kafka 1.1.1) where the Producer takes > forever to load metadata. The issue seems to have been patched in master > [here|[https://github.com/apache/kafka/pull/7781]]. > Would you *recommend* a backport of that above change to 1.1? There are 7-8 > changes that need to be cherry picked. The other option is to upgrade to 2.5 > (which would be much more involved) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout
guozhangwang commented on a change in pull request #8834: URL: https://github.com/apache/kafka/pull/8834#discussion_r476954744 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -917,17 +938,14 @@ private synchronized void resetGeneration() { synchronized void resetGenerationOnResponseError(ApiKeys api, Errors error) { log.debug("Resetting generation after encountering {} from {} response and requesting re-join", error, api); -// only reset the state to un-joined when it is not already in rebalancing Review comment: We do not need this check any more since when we are only resetting generation if we see illegal generation or unknown member id, and in either case we should no longer heartbeat This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10453) Backport of PR-7781
[ https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188029#comment-17188029 ] Ismael Juma commented on KAFKA-10453: - Yes, it is. > Backport of PR-7781 > --- > > Key: KAFKA-10453 > URL: https://issues.apache.org/jira/browse/KAFKA-10453 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 1.1.1 >Reporter: Niketh Sabbineni >Priority: Major > > We have been hitting this bug (with kafka 1.1.1) where the Producer takes > forever to load metadata. The issue seems to have been patched in master > [here|[https://github.com/apache/kafka/pull/7781]]. > Would you *recommend* a backport of that above change to 1.1? There are 7-8 > changes that need to be cherry picked. The other option is to upgrade to 2.5 > (which would be much more involved) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout
guozhangwang commented on a change in pull request #8834: URL: https://github.com/apache/kafka/pull/8834#discussion_r480425820 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -497,40 +501,18 @@ private synchronized void resetStateAndRejoin() { joinFuture.addListener(new RequestFutureListener() { @Override public void onSuccess(ByteBuffer value) { -// handle join completion in the callback so that the callback will be invoked Review comment: Well I should say part of that (the enabling of the heartbeat thread) is in JoinGroup response handler, while the rest (update metrics, etc) is in SyncGroup response handler. ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Errors.UNKNOWN_MEMBER_ID) case CompletingRebalance => -responseCallback(Errors.REBALANCE_IN_PROGRESS) + // consumers may start sending heartbeat after join-group response, in which case + // we should treat them as normal hb request and reset the timer + val member = group.get(memberId) Review comment: It would return the error code before: that is because it does not expect clients to send heartbeat before sending sync-group requests. Now it is not the case any more. ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -639,7 +641,11 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Errors.UNKNOWN_MEMBER_ID) case CompletingRebalance => -responseCallback(Errors.REBALANCE_IN_PROGRESS) Review comment: I had a discussion with @hachikuji about this. I think logically it should not return `REBALANCE_IN_PROGRESS` and clients in the future should update its handling logic too, maybe after some releases where we can break client-broker compatibility. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -446,14 +453,15 @@ boolean joinGroupIfNeeded(final Timer timer) { resetJoinGroupFuture(); needsJoinPrepare = true; } else { -log.info("Generation data was cleared by heartbeat thread. Initiating rejoin."); +log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " + + "the rebalance callback is triggered, marking this rebalance as failed and retry", + generation, state); resetStateAndRejoin(); resetJoinGroupFuture(); -return false; } } else { final RuntimeException exception = future.exception(); -log.info("Join group failed with {}", exception.toString()); +log.info("Rebalance failed with {}", exception.toString()); Review comment: The reason I changed it is exactly that it may not always due to join-group :) If sync-group failed, this could also be triggered. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
vvcephei commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r480429410 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashSet; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +windowStore = (Time
[GitHub] [kafka] guozhangwang commented on a change in pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout
guozhangwang commented on a change in pull request #8834: URL: https://github.com/apache/kafka/pull/8834#discussion_r480429496 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -528,7 +528,6 @@ public void onFailure(RuntimeException e) { } private void recordRebalanceFailure() { -state = MemberState.UNJOINED; Review comment: Previously this function has two lines: update the state and record sensors. Now that the first is called in the caller, this function becomes a one-liner and hence not worthy anymore so I in-lined it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
vvcephei merged pull request #9039: URL: https://github.com/apache/kafka/pull/9039 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
vvcephei commented on pull request #9039: URL: https://github.com/apache/kafka/pull/9039#issuecomment-684074223 Since Jenkins PR builds are still not functioning, I've merged in trunk and verified this pull request locally before merging it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9226: KAFKA-10444: Jenkinsfile testing
mumrah commented on a change in pull request #9226: URL: https://github.com/apache/kafka/pull/9226#discussion_r480452574 ## File path: Jenkinsfile ## @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +def setupGradle() { + // Delete gradle cache to workaround cache corruption bugs, see KAFKA-3167 + dir('.gradle') { +deleteDir() + } + sh './gradlew -version' +} + +def doValidation() { + try { +sh ''' + ./gradlew -PscalaVersion=$SCALA_VERSION clean compileJava compileScala compileTestJava compileTestScala \ + spotlessScalaCheck checkstyleMain checkstyleTest spotbugsMain rat \ + --profile --no-daemon --continue -PxmlSpotBugsReport=true \"$@\" +''' + } catch(err) { +error('Validation checks failed, aborting this build') + } +} + +def doTest() { + try { +sh ''' + ./gradlew -PscalaVersion=$SCALA_VERSION unitTest integrationTest \ + --profile --no-daemon --continue -PtestLoggingEvents=started,passed,skipped,failed "$@" +''' + } catch(err) { +echo 'Some tests failed, marking this build UNSTABLE' +currentBuild.result = 'UNSTABLE' Review comment: So, this seems to have caused us to lose which build had which results in the summary. Before we had:  Now we have:  I'll try moving the `junit` call inside the actual stage and see if that helps This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()
[ https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188042#comment-17188042 ] Aakash Gupta commented on KAFKA-2200: - Hi [~becket_qin] I am willing to take this ticket. As of now till date, this is how exceptions are being handled in kafkaProducer.send() method: {code:java} catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } {code} # TimeoutException in waiting for metadata update, what is your suggestion? How should it be handled if not via ApiException callback? As you mentioned, we are misusing this TimeoutException as idea was to use it only where replication couldn't complete within the allowed time, so should we create a new exception 'ClientTimeoutException' to handle such scenarios, and also use the same in waitOnMetadata() method ? # Validation of message size is throwing RecordTooLargeException which extends ApiException. In this case, you are correct to say that producer client is throwing RecordTooLargeException without even interacting with server. You've suggested 2 scenarios which can cause exceptions : ## *If the size of serialised uncompressed message is more than maxRequestSize*: I'm not sure if we can estimate the size of message keeping compression type in consideration. So, current implementation throws RecordTooLargeException based on the ESTIMATE w/o keeping into account the compression type. What is the expected behaviour in this case? ## *If the message size is bigger than the totalMemorySize or memoryBufferSize* : **Buffer pool would throw IllegalArgumentException when asked for allocation. Should we just catch this exception, record it and throw it back? [~becket_qin] Can you please answer above queries and validate my understanding? Apologies if I've misunderstood something as I am new to Kafka community. > kafkaProducer.send() should not call callback.onCompletion() > > > Key: KAFKA-2200 > URL: https://issues.apache.org/jira/browse/KAFKA-2200 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.0 >Reporter: Jiangjie Qin >Priority: Major > Labels: newbie > > KafkaProducer.send() should not call callback.onCompletion() because this > might break the callback firing order. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()
[ https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aakash Gupta reassigned KAFKA-2200: --- Assignee: Aakash Gupta > kafkaProducer.send() should not call callback.onCompletion() > > > Key: KAFKA-2200 > URL: https://issues.apache.org/jira/browse/KAFKA-2200 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.0 >Reporter: Jiangjie Qin >Assignee: Aakash Gupta >Priority: Major > Labels: newbie > > KafkaProducer.send() should not call callback.onCompletion() because this > might break the callback firing order. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()
[ https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188042#comment-17188042 ] Aakash Gupta edited comment on KAFKA-2200 at 8/31/20, 11:20 PM: Hi [~becket_qin] I am willing to take this ticket. As of now till date, this is how exceptions are being handled in kafkaProducer.send() method: {code:java} catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } {code} # TimeoutException in waiting for metadata update, what is your suggestion? How should it be handled if not via ApiException callback? As you mentioned, we are misusing this TimeoutException as idea was to use it only where replication couldn't complete within the allowed time, so should we create a new exception 'ClientTimeoutException' to handle such scenarios, and also use the same in waitOnMetadata() method ? # Validation of message size is throwing RecordTooLargeException which extends ApiException. In this case, you are correct to say that producer client is throwing RecordTooLargeException without even interacting with server. You've suggested 2 scenarios which can cause exceptions : ## *If the size of serialised uncompressed message is more than maxRequestSize*: I'm not sure if we can estimate the size of message keeping compression type in consideration. So, current implementation throws RecordTooLargeException based on the ESTIMATE w/o keeping into account the compression type. What is the expected behaviour in this case? ## *If the message size is bigger than the totalMemorySize or memoryBufferSize* : Buffer pool would throw IllegalArgumentException when asked for allocation. Should we just catch this exception, record it and throw it back? [~becket_qin] Can you please answer above queries and validate my understanding? Apologies if I've misunderstood something as I am new to Kafka community. was (Author: aakashgupta96): Hi [~becket_qin] I am willing to take this ticket. As of now till date, this is how exceptions are being handled in kafkaProducer.send() method: {code:java} catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } {code} # TimeoutException in waiting for metadata update, what is your suggestion? How should it be handled if not via ApiException callback? As you mentioned, we are misusing this TimeoutException as idea was to use it only where replication couldn't complete within the allowed time, so should we create a new exception 'ClientTimeoutException' to handle such scenarios, and also use the same in waitOnMetadata() method ? # Validation of message size is throwing RecordTooLargeException which extends ApiException. In this case, you are correct to say that producer client is throwing RecordTooLargeException without even interacting with server. You've suggested 2 scenarios which can cause exceptions : ## *If the size of serialised uncompressed message is more than maxRequestSize*: I'm not sure if we can estimate the size of message keeping compression type in consideration. So, current implementation throws RecordTooLargeException based on the ESTIMATE w/o keeping into account the compression type. What is the expected behaviour in this case? ## *If the message size is bigger than the tota
[GitHub] [kafka] guozhangwang commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
guozhangwang commented on a change in pull request #9138: URL: https://github.com/apache/kafka/pull/9138#discussion_r480469688 ## File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java ## @@ -136,34 +174,64 @@ * * This iterator must be closed after use. * - * @param from the first key in the range - * @param tothe last key in the range - * @param fromTime time range start (inclusive) - * @param toTimetime range end (inclusive) - * @return an iterator over windowed key-value pairs {@code , value>} + * @param from the first key in the range + * @param to the last key in the range + * @param timeFrom time range start (inclusive), where iteration starts. + * @param timeTo time range end (inclusive), where iteration ends. + * @return an iterator over windowed key-value pairs {@code , value>}, from beginning to end of time. * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException If {@code null} is used for any key. - * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} + * @throws NullPointerException If {@code null} is used for any key. + * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} */ -KeyValueIterator, V> fetch(K from, K to, Instant fromTime, Instant toTime) +KeyValueIterator, V> fetch(K from, K to, Instant timeFrom, Instant timeTo) throws IllegalArgumentException; /** -* Gets all the key-value pairs in the existing windows. -* -* @return an iterator over windowed key-value pairs {@code , value>} -* @throws InvalidStateStoreException if the store is not initialized -*/ + * Get all the key-value pairs in the given key range and time range from all the existing windows + * in backward order with respect to time (from end to beginning of time). + * + * This iterator must be closed after use. + * + * @param from the first key in the range + * @param to the last key in the range + * @param timeFrom time range start (inclusive), where iteration ends. + * @param timeTo time range end (inclusive), where iteration starts. + * @return an iterator over windowed key-value pairs {@code , value>}, from end to beginning of time. + * @throws InvalidStateStoreException if the store is not initialized + * @throws NullPointerException If {@code null} is used for any key. + * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} + */ +KeyValueIterator, V> backwardFetch(K from, K to, Instant timeFrom, Instant timeTo) Review comment: This is out of the scope of this PR, but I'd like to point out that the current IQ does not actually obey the ordering when there are multiple local stores hosted on that instance. For example, if there are two stores from two tasks hosting keys {1, 3} and {2,4}, then a range query of key [1,4] would return in the order of `1,3,2,4` but not `1,2,3,4` since it is looping over the stores only. This would be the case for either forward or backward fetches on range-key-range-time. For single key time range fetch, or course, there's no such issue. I think it worth documenting this for now until we have a fix (and actually we are going to propose something soon). ## File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java ## @@ -33,11 +33,11 @@ /** * Get the value of key from a window. * - * @param key the key to fetch - * @param time start timestamp (inclusive) of the window + * @param key the key to fetch + * @param time start timestamp (inclusive) of the window * @return The value or {@code null} if no value is found in the window * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException If {@code null} is used for any key. + * @throws NullPointerException If {@code null} is used for any key. Review comment: nit: is this intentional? Also I'd suggest we do not use capitalized `If` to be consistent with the above line, ditto elsewhere below. ## File path: streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java ## @@ -150,13 +185,25 @@ * @return an iterator over windowed key-value pairs {@code , value>} * @throws InvalidStateStoreException if the store is not initialized */ -@SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed +// note, this method must be kept if super#fetchAll(...) is removed +@SuppressWarnings("deprecation") KeyValueIterator, V> fetchAll(l
[GitHub] [kafka] guozhangwang commented on a change in pull request #9232: KAFKA-9924: Add remaining property-based RocksDB metrics as described in KIP-607
guozhangwang commented on a change in pull request #9232: URL: https://github.com/apache/kafka/pull/9232#discussion_r480477639 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java ## @@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, final Statistics statist statistics != null && storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics == null))) { -throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + taskId + -" is" + (statistics == null ? " " : " not ") + "null although the statistics of another store in this " + +throw new IllegalStateException("Statistics for segment " + segmentName + " of task " + taskId + +" is" + (statistics == null ? " " : " not ") + "null although the statistics of another segment in this " + "metrics recorder is" + (statistics != null ? " " : " not ") + "null. " + "This is a bug in Kafka Streams. " + "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues";); } } +private void verifyDbAndCacheAndStatistics(final String segmentName, + final RocksDB db, + final Cache cache, + final Statistics statistics) { +for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) { +verifyIfSomeAreNull(segmentName, statistics, valueProviders.statistics, "statistics"); +verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, "cache"); +if (db == valueProviders.db) { +throw new IllegalStateException("DB instance for store " + segmentName + " of task " + taskId + +" was already added for another segment as a value provider. This is a bug in Kafka Streams. " + +"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues";); +} +if (storeToValueProviders.size() == 1 && cache != valueProviders.cache) { Review comment: Hmm, why we need the second condition to determine `singleCache = false` here? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java ## @@ -150,14 +176,55 @@ private void verifyStatistics(final String segmentName, final Statistics statist statistics != null && storeToValueProviders.values().stream().anyMatch(valueProviders -> valueProviders.statistics == null))) { -throw new IllegalStateException("Statistics for store \"" + segmentName + "\" of task " + taskId + -" is" + (statistics == null ? " " : " not ") + "null although the statistics of another store in this " + +throw new IllegalStateException("Statistics for segment " + segmentName + " of task " + taskId + +" is" + (statistics == null ? " " : " not ") + "null although the statistics of another segment in this " + "metrics recorder is" + (statistics != null ? " " : " not ") + "null. " + "This is a bug in Kafka Streams. " + "Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues";); } } +private void verifyDbAndCacheAndStatistics(final String segmentName, + final RocksDB db, + final Cache cache, + final Statistics statistics) { +for (final DbAndCacheAndStatistics valueProviders : storeToValueProviders.values()) { +verifyIfSomeAreNull(segmentName, statistics, valueProviders.statistics, "statistics"); +verifyIfSomeAreNull(segmentName, cache, valueProviders.cache, "cache"); +if (db == valueProviders.db) { +throw new IllegalStateException("DB instance for store " + segmentName + " of task " + taskId + +" was already added for another segment as a value provider. This is a bug in Kafka Streams. " + +"Please open a bug report under https://issues.apache.org/jira/projects/KAFKA/issues";); +} +if (storeToValueProviders.size() == 1 && cache != valueProviders.cache) { +singleCache = false; +} else if (singleCache && cache != valueProviders.cache || !singleCache && cache == valueProviders.cache) { +throw new IllegalStateException("Caches for store " + storeName + " of task " + taskId + +" are either not all distinct or do not all refer to t
[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
ableegoldman commented on a change in pull request #9138: URL: https://github.com/apache/kafka/pull/9138#discussion_r480479677 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ## @@ -271,27 +345,68 @@ public synchronized void put(final Bytes key, final PeekingKeyValueIterator filteredCacheIterator = new FilteredCacheIterator(cacheIterator, hasNextCondition, cacheFunction); return new MergedSortedCacheWindowStoreKeyValueIterator( -filteredCacheIterator, -underlyingIterator, -bytesSerdes, -windowSize, -cacheFunction +filteredCacheIterator, +underlyingIterator, +bytesSerdes, +windowSize, +cacheFunction, +true +); +} + +@Override +public KeyValueIterator, byte[]> backwardFetchAll(final long timeFrom, Review comment: We would still need to keep this method: we're not removing all long-based APIs, just the public/IQ methods in ReadOnlyWindowStore. But we still want to keep the long-based methods on WindowStore and all the internal store interfaces for performance reasons. Maybe once we move everything to use `Instant` all the way down to the serialization then we can remove these long-based methods. I guess we should consider that when discussing KIP-667, but for the time being at least, we should keep them for internal use This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
ableegoldman commented on a change in pull request #9138: URL: https://github.com/apache/kafka/pull/9138#discussion_r480481060 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java ## @@ -338,25 +452,36 @@ public synchronized void close() { private CacheIteratorWrapper(final Bytes key, final long timeFrom, - final long timeTo) { -this(key, key, timeFrom, timeTo); + final long timeTo, + final boolean forward) { +this(key, key, timeFrom, timeTo, forward); } private CacheIteratorWrapper(final Bytes keyFrom, final Bytes keyTo, final long timeFrom, - final long timeTo) { + final long timeTo, + final boolean forward) { this.keyFrom = keyFrom; this.keyTo = keyTo; this.timeTo = timeTo; -this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get())); +this.forward = forward; this.segmentInterval = cacheFunction.getSegmentInterval(); -this.currentSegmentId = cacheFunction.segmentId(timeFrom); -setCacheKeyRange(timeFrom, currentSegmentLastTime()); +if (forward) { +this.lastSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get())); +this.currentSegmentId = cacheFunction.segmentId(timeFrom); -this.current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); +setCacheKeyRange(timeFrom, currentSegmentLastTime()); +this.current = context.cache().range(cacheName, cacheKeyFrom, cacheKeyTo); +} else { +this.currentSegmentId = cacheFunction.segmentId(Math.min(timeTo, maxObservedTimestamp.get())); +this.lastSegmentId = cacheFunction.segmentId(timeFrom); + +setCacheKeyRange(currentSegmentBeginTime(), Math.min(timeTo, maxObservedTimestamp.get())); Review comment: This looks right to me -- in the iterator constructor, we would normally start from `timeFrom` (the minimum time) and advance to the end of the current segment (that's what the "cache key range" defines, the range of the current segment) When iterating backwards, the current segment is actually the largest segment, so the cache key lower range is the current (largest) segment's beginning timestamp, and the upper range is the maximum timestamp of the backwards fetch. Does that make sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
ableegoldman commented on a change in pull request #9138: URL: https://github.com/apache/kafka/pull/9138#discussion_r480482350 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java ## @@ -67,14 +70,22 @@ public Bytes peekNextKey() { public boolean hasNext() { boolean hasNext = false; while ((currentIterator == null || !(hasNext = hasNextConditionHasNext()) || !currentSegment.isOpen()) -&& segments.hasNext()) { +&& segments.hasNext()) { close(); currentSegment = segments.next(); try { if (from == null || to == null) { -currentIterator = currentSegment.all(); +if (forward) { +currentIterator = currentSegment.all(); +} else { +currentIterator = currentSegment.reverseAll(); Review comment: Yeah that's a good question, it does seem like we can just remove the `range` and `all` on the Segment interface This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188058#comment-17188058 ] Jerry Wei commented on KAFKA-10134: --- [~guozhang] one more question about PR #8834, whether or not *GroupCoordinator* changes is mandatory. I mean Kafka server changes should be more expensive that clients. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.5.2, 2.6.1 > > Attachments: consumer3.log.2020-08-20.log, > consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are n
[ https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188082#comment-17188082 ] huxihx commented on KAFKA-10450: Same version for both clients and brokers? > console-producer throws Uncaught error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > --- > > Key: KAFKA-10450 > URL: https://issues.apache.org/jira/browse/KAFKA-10450 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: Kafka Version 2.6.0 > MacOS Version - macOS Catalina 10.15.6 (19G2021) > java version "11.0.8" 2020-07-14 LTS > Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) > Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode) >Reporter: Jigar Naik >Priority: Blocker > > Kafka-console-producer.sh gives below error on Mac > ERROR [Producer clientId=console-producer] Uncaught error in kafka producer > I/O thread: (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > *Steps to re-produce the issue.* > Download Kafka from > [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz] > > Change data and log directory (Optional) > Create Topic Using below command > > {code:java} > ./kafka-topics.sh \ > --create \ > --zookeeper localhost:2181 \ > --replication-factor 1 \ > --partitions 1 \ > --topic my-topic{code} > > Start Kafka console producer using below command > > {code:java} > ./kafka-console-consumer.sh \ > --topic my-topic \ > --from-beginning \ > --bootstrap-server localhost:9092{code} > > Gives below output > > {code:java} > ./kafka-console-producer.sh \ > --topic my-topic \ > --bootstrap-server 127.0.0.1:9092 > >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] > >Uncaught error in kafka producer I/O thread: > >(org.apache.kafka.clients.producer.internals.Sender) > java.nio.BufferUnderflowException > at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650) > at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391) > at > org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43) > at > org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102) > at > org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70) > at > org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66) > at > org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught > error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > at > org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62) > at > org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap > broker 127.0.0.1:9092 (id: -1 rack: null) disconnected > (org.apache.kafka.clients.NetworkClient) > {code} > > > The same steps works fine with Kafka version 2.0.0 on Mac. > The same steps works fine with Kafka version 2.6.0 on Windows. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no
[ https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jigar Naik updated KAFKA-10450: --- Priority: Critical (was: Blocker) > console-producer throws Uncaught error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > --- > > Key: KAFKA-10450 > URL: https://issues.apache.org/jira/browse/KAFKA-10450 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: Kafka Version 2.6.0 > MacOS Version - macOS Catalina 10.15.6 (19G2021) > java version "11.0.8" 2020-07-14 LTS > Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) > Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode) >Reporter: Jigar Naik >Priority: Critical > > Kafka-console-producer.sh gives below error on Mac > ERROR [Producer clientId=console-producer] Uncaught error in kafka producer > I/O thread: (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > *Steps to re-produce the issue.* > Download Kafka from > [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz] > > Change data and log directory (Optional) > Create Topic Using below command > > {code:java} > ./kafka-topics.sh \ > --create \ > --zookeeper localhost:2181 \ > --replication-factor 1 \ > --partitions 1 \ > --topic my-topic{code} > > Start Kafka console producer using below command > > {code:java} > ./kafka-console-consumer.sh \ > --topic my-topic \ > --from-beginning \ > --bootstrap-server localhost:9092{code} > > Gives below output > > {code:java} > ./kafka-console-producer.sh \ > --topic my-topic \ > --bootstrap-server 127.0.0.1:9092 > >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] > >Uncaught error in kafka producer I/O thread: > >(org.apache.kafka.clients.producer.internals.Sender) > java.nio.BufferUnderflowException > at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650) > at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391) > at > org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43) > at > org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102) > at > org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70) > at > org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66) > at > org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught > error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > at > org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62) > at > org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap > broker 127.0.0.1:9092 (id: -1 rack: null) disconnected > (org.apache.kafka.clients.NetworkClient) > {code} > > > The same steps works fine with Kafka version 2.0.0 on Mac. > The same steps works fine with Kafka version 2.6.0 on Windows. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are n
[ https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188085#comment-17188085 ] Jigar Naik commented on KAFKA-10450: Yes, same version for both. And also from kafka console consumer which ship with kafka gave the same error. i have found the issue. The port 9092 was being used by sonarqube H2 DB. after stopping sonarqube everything worked fine. surprisingly i was hoping to getting address already in use instead of this stack trace. and also the kafka_topic.sh worked fine without any issue. Changing the priority to minor as its no-more blocker. It would be better if proper error message is displayed instead of this stacktrace. It would have saved few hours for me:) thanks! > console-producer throws Uncaught error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > --- > > Key: KAFKA-10450 > URL: https://issues.apache.org/jira/browse/KAFKA-10450 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: Kafka Version 2.6.0 > MacOS Version - macOS Catalina 10.15.6 (19G2021) > java version "11.0.8" 2020-07-14 LTS > Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) > Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode) >Reporter: Jigar Naik >Priority: Critical > > Kafka-console-producer.sh gives below error on Mac > ERROR [Producer clientId=console-producer] Uncaught error in kafka producer > I/O thread: (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > *Steps to re-produce the issue.* > Download Kafka from > [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz] > > Change data and log directory (Optional) > Create Topic Using below command > > {code:java} > ./kafka-topics.sh \ > --create \ > --zookeeper localhost:2181 \ > --replication-factor 1 \ > --partitions 1 \ > --topic my-topic{code} > > Start Kafka console producer using below command > > {code:java} > ./kafka-console-consumer.sh \ > --topic my-topic \ > --from-beginning \ > --bootstrap-server localhost:9092{code} > > Gives below output > > {code:java} > ./kafka-console-producer.sh \ > --topic my-topic \ > --bootstrap-server 127.0.0.1:9092 > >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] > >Uncaught error in kafka producer I/O thread: > >(org.apache.kafka.clients.producer.internals.Sender) > java.nio.BufferUnderflowException > at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650) > at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391) > at > org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43) > at > org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102) > at > org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70) > at > org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66) > at > org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught > error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > at > org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62) > at > org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap > broker 127.0.0.1:9092 (id: -1 rack: null) disconnected > (org.apache.kafka.
[jira] [Updated] (KAFKA-10450) console-producer throws Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender) java.lang.IllegalStateException: There are no
[ https://issues.apache.org/jira/browse/KAFKA-10450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jigar Naik updated KAFKA-10450: --- Priority: Minor (was: Critical) > console-producer throws Uncaught error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > --- > > Key: KAFKA-10450 > URL: https://issues.apache.org/jira/browse/KAFKA-10450 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: Kafka Version 2.6.0 > MacOS Version - macOS Catalina 10.15.6 (19G2021) > java version "11.0.8" 2020-07-14 LTS > Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) > Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode) >Reporter: Jigar Naik >Priority: Minor > > Kafka-console-producer.sh gives below error on Mac > ERROR [Producer clientId=console-producer] Uncaught error in kafka producer > I/O thread: (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > *Steps to re-produce the issue.* > Download Kafka from > [kafka_2.13-2.6.0.tgz|https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.13-2.6.0.tgz] > > Change data and log directory (Optional) > Create Topic Using below command > > {code:java} > ./kafka-topics.sh \ > --create \ > --zookeeper localhost:2181 \ > --replication-factor 1 \ > --partitions 1 \ > --topic my-topic{code} > > Start Kafka console producer using below command > > {code:java} > ./kafka-console-consumer.sh \ > --topic my-topic \ > --from-beginning \ > --bootstrap-server localhost:9092{code} > > Gives below output > > {code:java} > ./kafka-console-producer.sh \ > --topic my-topic \ > --bootstrap-server 127.0.0.1:9092 > >[2020-09-01 00:24:18,177] ERROR [Producer clientId=console-producer] > >Uncaught error in kafka producer I/O thread: > >(org.apache.kafka.clients.producer.internals.Sender) > java.nio.BufferUnderflowException > at java.base/java.nio.Buffer.nextGetIndex(Buffer.java:650) > at java.base/java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:391) > at > org.apache.kafka.common.protocol.ByteBufferAccessor.readInt(ByteBufferAccessor.java:43) > at > org.apache.kafka.common.message.ResponseHeaderData.read(ResponseHeaderData.java:102) > at > org.apache.kafka.common.message.ResponseHeaderData.(ResponseHeaderData.java:70) > at > org.apache.kafka.common.requests.ResponseHeader.parse(ResponseHeader.java:66) > at > org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:717) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:834) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,179] ERROR [Producer clientId=console-producer] Uncaught > error in kafka producer I/O thread: > (org.apache.kafka.clients.producer.internals.Sender) > java.lang.IllegalStateException: There are no in-flight requests for node -1 > at > org.apache.kafka.clients.InFlightRequests.requestQueue(InFlightRequests.java:62) > at > org.apache.kafka.clients.InFlightRequests.completeNext(InFlightRequests.java:70) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:833) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:834) > [2020-09-01 00:24:18,682] WARN [Producer clientId=console-producer] Bootstrap > broker 127.0.0.1:9092 (id: -1 rack: null) disconnected > (org.apache.kafka.clients.NetworkClient) > {code} > > > The same steps works fine with Kafka version 2.0.0 on Mac. > The same steps works fine with Kafka version 2.6.0 on Windows. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on pull request #9226: KAFKA-10444: Jenkinsfile testing
mumrah commented on pull request #9226: URL: https://github.com/apache/kafka/pull/9226#issuecomment-684175203 Example failed build with a compile error: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-9226/67 One with a rat failure: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-9226/66/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10366) TimeWindowedDeserializer doesn't allow users to set a custom window size
[ https://issues.apache.org/jira/browse/KAFKA-10366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188115#comment-17188115 ] Sophie Blee-Goldman commented on KAFKA-10366: - Yeah thanks for tracking that down [~mjsax]. I think we can repurpose/reinterpret this ticket to track fixing the Streams tests/test utils. If we have to add a lot of overloads throughout the call hierarchy between the test and the actual Consumer creation, maybe we can save some work by just adding a ConsumerParameters class (better names welcome) and replacing the ConsumerConfig/Properties parameter in all the methods instead. Then we can just call the appropriate Consumer constructor based on whether the deserializers have been set in the passed in. ConsumerParameters > TimeWindowedDeserializer doesn't allow users to set a custom window size > > > Key: KAFKA-10366 > URL: https://issues.apache.org/jira/browse/KAFKA-10366 > Project: Kafka > Issue Type: Bug >Reporter: Leah Thomas >Assignee: Leah Thomas >Priority: Major > Labels: streams > > Related to [KAFKA-4468|https://issues.apache.org/jira/browse/KAFKA-4468], in > timeWindowedDeserializer Long.MAX_VALUE is used as _windowSize_ for any > deserializer that uses the default constructor. While streams apps can pass > in a window size in serdes or while creating a timeWindowedDeserializer, the > deserializer that is actually used in processing the messages is created by > the Kafka consumer, without passing in the set windowSize. The deserializer > the consumer creates uses the configs, but as there is no config for > windowSize, the window size is always default. > See _KStreamAggregationIntegrationTest #ShouldReduceWindowed()_ as an example > of this issue. Despite passing in the windowSize to both the serdes and the > timeWindowedDeserializer, the window size is set to Long.MAX_VALUE. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188116#comment-17188116 ] Guozhang Wang commented on KAFKA-10134: --- Hello [~zhowei] that broker-side change is not mandatory, I just included that part to make the whole PR complete, but it is not a necessary change for your situation. > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.5.2, 2.6.1 > > Attachments: consumer3.log.2020-08-20.log, > consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10072) Kafkaconsumer is configured with different clientid parameters to obtain different results
[ https://issues.apache.org/jira/browse/KAFKA-10072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ankit Kumar reassigned KAFKA-10072: --- Assignee: Ankit Kumar > Kafkaconsumer is configured with different clientid parameters to obtain > different results > -- > > Key: KAFKA-10072 > URL: https://issues.apache.org/jira/browse/KAFKA-10072 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.0 > Environment: centos7.6 8C 32G >Reporter: victor >Assignee: Ankit Kumar >Priority: Blocker > > kafka-console-consumer.sh --bootstrap-server host1:port --consumer-property > {color:#DE350B}client.id=aa{color} --from-beginning --topic topicA > {color:#DE350B}There's no data > {color} > kafka-console-consumer.sh --bootstrap-server host1:port --consumer-property > {color:#DE350B}clientid=bb{color} --from-beginning --topic topicA > {color:#DE350B}Successfully consume data{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10453) Backport of PR-7781
[ https://issues.apache.org/jira/browse/KAFKA-10453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Niketh Sabbineni resolved KAFKA-10453. -- Resolution: Workaround > Backport of PR-7781 > --- > > Key: KAFKA-10453 > URL: https://issues.apache.org/jira/browse/KAFKA-10453 > Project: Kafka > Issue Type: Wish > Components: clients >Affects Versions: 1.1.1 >Reporter: Niketh Sabbineni >Priority: Major > > We have been hitting this bug (with kafka 1.1.1) where the Producer takes > forever to load metadata. The issue seems to have been patched in master > [here|[https://github.com/apache/kafka/pull/7781]]. > Would you *recommend* a backport of that above change to 1.1? There are 7-8 > changes that need to be cherry picked. The other option is to upgrade to 2.5 > (which would be much more involved) -- This message was sent by Atlassian Jira (v8.3.4#803005)