[GitHub] [kafka] inponomarev edited a comment on pull request #9107: KAFKA-5488: KIP-418 implementation
inponomarev edited a comment on pull request #9107: URL: https://github.com/apache/kafka/pull/9107#issuecomment-666749809 ⚠️ Two differences with KIP specification, discussion needed⚠️ 1. Instead of multiple overloaded variants of `Branched.with` we now have `Branched.withFunction` and `Branched.withConsumer`. This is because of compiler warnings about overloading (`Function` and `Consumer` being indistinguishable when supplied as lambdas) 2. 'Fully covariant' signatures like `Consumer>` don't work as expected. Used `Consumer>` instead This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8328) Kafka smooth expansion
[ https://issues.apache.org/jira/browse/KAFKA-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168514#comment-17168514 ] ChenLin commented on KAFKA-8328: Thanks for your attention, I will re submit the batch to the trunk version。 > Kafka smooth expansion > -- > > Key: KAFKA-8328 > URL: https://issues.apache.org/jira/browse/KAFKA-8328 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 0.10.2.0 >Reporter: ChenLin >Priority: Major > Labels: Kafka, expansion > Fix For: 0.10.2.0 > > Attachments: DiskUtil.png, Kafka_smooth_expansion.patch, > producerP999.png > > > When expanding the kafka cluster, the new follower will read the data from > the earliest offset. This can result in a large amount of historical data > being read from the disk, putting a lot of pressure on the disk and affecting > the performance of the kafka service, for example, the producer write latency > will increase. In general, kafka's original expansion mechanism has the > following problems: > 1. The new follower will put a lot of pressure on the disk; > 2. Causes the producer write latency to increase; > 3. Causes the consumer read latency to increase; > In order to solve these problems, we have proposed a solution for > smooth expansion. The main idea of the scheme is that the newly added > follower reads data from the HW position, and when the newly added follower > reads the data to a certain time threshold or data size threshold, the > follower enters the ISR queue. . Since the new follower reads data from the > HW location, most of the data read is in the operating system's cache, so it > does not put pressure on the disk and does not affect the performance of the > kafka service, thus solving the above problems. > In order to illustrate the problems of the original expansion scheme, > we have done some tests, and there are corresponding test charts in the > attachment. > !producerP999.png! > !DiskUtil.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8328) Kafka smooth expansion
[ https://issues.apache.org/jira/browse/KAFKA-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168515#comment-17168515 ] ChenLin commented on KAFKA-8328: Thanks for your attention, I will re submit the batch to the trunk version。[~roncenzhao] [~xilangyan] > Kafka smooth expansion > -- > > Key: KAFKA-8328 > URL: https://issues.apache.org/jira/browse/KAFKA-8328 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Affects Versions: 0.10.2.0 >Reporter: ChenLin >Priority: Major > Labels: Kafka, expansion > Fix For: 0.10.2.0 > > Attachments: DiskUtil.png, Kafka_smooth_expansion.patch, > producerP999.png > > > When expanding the kafka cluster, the new follower will read the data from > the earliest offset. This can result in a large amount of historical data > being read from the disk, putting a lot of pressure on the disk and affecting > the performance of the kafka service, for example, the producer write latency > will increase. In general, kafka's original expansion mechanism has the > following problems: > 1. The new follower will put a lot of pressure on the disk; > 2. Causes the producer write latency to increase; > 3. Causes the consumer read latency to increase; > In order to solve these problems, we have proposed a solution for > smooth expansion. The main idea of the scheme is that the newly added > follower reads data from the HW position, and when the newly added follower > reads the data to a certain time threshold or data size threshold, the > follower enters the ISR queue. . Since the new follower reads data from the > HW location, most of the data read is in the operating system's cache, so it > does not put pressure on the disk and does not affect the performance of the > kafka service, thus solving the above problems. > In order to illustrate the problems of the original expansion scheme, > we have done some tests, and there are corresponding test charts in the > attachment. > !producerP999.png! > !DiskUtil.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10137) Clean-up retain Duplicate logic in Window Stores
[ https://issues.apache.org/jira/browse/KAFKA-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168521#comment-17168521 ] Bruno Cadonna commented on KAFKA-10137: --- I also noticed this the last time I looked at {{ChangeLoggingWindowBytesStore}}. Since the sequence number is not incremented if {{retainDuplicates}} is false, I think it is OK from a correctness point of view. Maybe the reason why we write the sequence number also when {{retainDuplicates}} is false is that in such a way we do not need to distinguish whether the key has a sequence number or not when we read the keys again. > Clean-up retain Duplicate logic in Window Stores > > > Key: KAFKA-10137 > URL: https://issues.apache.org/jira/browse/KAFKA-10137 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Priority: Minor > > Stream-stream joins use the regular `WindowStore` implementation but with > `retainDuplicates` set to true. To allow for duplicates while using the same > unique-key underlying stores we just wrap the key with an incrementing > sequence number before inserting it. > The logic to maintain and append the sequence number is present in multiple > locations, namely in the changelogging window store and in its underlying > window stores. We should consolidate this code to one single location. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-667010664 The test failure is unrelated. It's `org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > shouldUpgradeFromEosAlphaToEosBeta[true]` again This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10331) MirrorMaker2 active-active replication without topic renaming
Wojtek Konsek created KAFKA-10331: - Summary: MirrorMaker2 active-active replication without topic renaming Key: KAFKA-10331 URL: https://issues.apache.org/jira/browse/KAFKA-10331 Project: Kafka Issue Type: Wish Components: mirrormaker Reporter: Wojtek Konsek MirrorMaker2 as implemented in Kafka 2.5.x does support active-active replication from source cluster topic "A" to destination cluster topic "A", but if replication is enabled in both directions, the messages are duplicated again and again. Are there any plans to support bi-directional active-active replication, replicating from topic A on cluster 1 to topic A on cluster 2, and from topic A (same name) on cluster 2 to topic A on cluster 1? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10332) MirrorMaker2 fails to detect topic if remote topic is created first
[ https://issues.apache.org/jira/browse/KAFKA-10332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-10332: --- Summary: MirrorMaker2 fails to detect topic if remote topic is created first (was: MirrorMaker2 fails to detect topic is remote topic is created first) > MirrorMaker2 fails to detect topic if remote topic is created first > --- > > Key: KAFKA-10332 > URL: https://issues.apache.org/jira/browse/KAFKA-10332 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > > Setup: > - 2 clusters: source and target > - Mirroring data from source to target > - create a topic called source.mytopic on the target cluster > - create a topic called mytopic on the source cluster > At this point, MM2 does not start mirroring the topic. > This also happens if you delete and recreate a topic that is being mirrored. > The issue is in > [refreshTopicPartitions()|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L211-L232] > which basically does a diff between the 2 clusters. > When creating the topic on the source cluster last, it makes the partition > list of both clusters match, hence not triggering a reconfiguration -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10332) MirrorMaker2 fails to detect topic is remote topic is created first
Mickael Maison created KAFKA-10332: -- Summary: MirrorMaker2 fails to detect topic is remote topic is created first Key: KAFKA-10332 URL: https://issues.apache.org/jira/browse/KAFKA-10332 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.6.0 Reporter: Mickael Maison Assignee: Mickael Maison Setup: - 2 clusters: source and target - Mirroring data from source to target - create a topic called source.mytopic on the target cluster - create a topic called mytopic on the source cluster At this point, MM2 does not start mirroring the topic. This also happens if you delete and recreate a topic that is being mirrored. The issue is in [refreshTopicPartitions()|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L211-L232] which basically does a diff between the 2 clusters. When creating the topic on the source cluster last, it makes the partition list of both clusters match, hence not triggering a reconfiguration -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10333) Provide an API to retrieve Kafka Connect task configurations
Mickael Maison created KAFKA-10333: -- Summary: Provide an API to retrieve Kafka Connect task configurations Key: KAFKA-10333 URL: https://issues.apache.org/jira/browse/KAFKA-10333 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Mickael Maison Assignee: Mickael Maison Kafka Connect exposes an API to retrieve configurations from connectors. Connectors are responsible for creating tasks. When doing so, they have to build configurations for individual tasks. In some case, the configuration can be passed as is to tasks but in some others, the configuration is mutated to make tasks do specific work. For example with MirrorSourceConnector, the connector configuration has a field "topics" which is a list of topics and patterns to mirror. When the connector builds task configurations, it resolves the list of topic names and patterns to exact partitions and spread the partitions over all the tasks. It would be useful to identify the exact configuration each task is given. For MM2, it would allow identifying the partitions that matched the topics field. I would also help understanding the impact when a task fails -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
cadonna commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r463514179 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ## @@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String group0100To24, checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, expectedNumberofE2ELatencyMetrics); Review comment: You can use the following to get it right without the need to do the check for the e2e latency before filtering ``` .filter(m -> m.metricName().tags().containsKey(tagKey) && (m.metricName().group().equals(group0100To24) || m.metricName().group().equals(STATE_STORE_LEVEL_GROUP)) ).collect(Collectors.toList()); ``` The reason for the difference between the KV store and the window store is that they are used in different tests with different number of state stores. The test that uses the KV stores tests three different types of KV stores, namely in-memory, rocksdb, and in-memory-lru-cache. For each of this types the old group name changes. That is also the reason we need to pass the parameter `group0100To24` to `checkKeyValueStoreMetrics()`. In `checkWindowStoreAndSuppressionBufferMetrics()` we need to filter for four groups, because the corresponding test uses suppression and window state store. Suppression buffers had their own groups in the old version. In the new version they moved into the state store group. Those groups are `BUFFER_LEVEL_GROUP_0100_TO_24` and `STATE_STORE_LEVEL_GROUP`. The window state store had their own group in the old version, i.e., `STATE_STORE_LEVEL_GROUP_ROCKSDB_WINDOW_STORE_0100_TO_24` (we are only using RocksDB-based window stores in the test). Finally, during the implementationof KIP-444, we discovered that we named a group incorrectly. That why we filter also for group `stream-rocksdb-window-metrics`. So to sum up, it is hard to compare the verifications for KV stores and window stores, because they are used in different tests. Sorry, I should have been clearer on that before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
cadonna commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r463514179 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ## @@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String group0100To24, checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, expectedNumberofE2ELatencyMetrics); Review comment: You can use the following to get it right without the need to do the check for the e2e latency before filtering ``` .filter(m -> m.metricName().tags().containsKey(tagKey) && (m.metricName().group().equals(group0100To24) || m.metricName().group().equals(STATE_STORE_LEVEL_GROUP)) ).collect(Collectors.toList()); ``` The reason for the difference between the KV store and the window store is that they are used in different tests with different number of state stores. The test that uses the KV stores tests three different types of KV stores, namely in-memory, rocksdb, and in-memory-lru-cache. For each of this types the old group name changes. That is also the reason we need to pass the parameter `group0100To24` to `checkKeyValueStoreMetrics()`. In `checkWindowStoreAndSuppressionBufferMetrics()` we need to filter for four groups, because the corresponding test uses suppression and window state store. Suppression buffers had their own group in the old version. In the new version they moved into the state store group. Those groups are `BUFFER_LEVEL_GROUP_0100_TO_24` and `STATE_STORE_LEVEL_GROUP`. The window state store had their own group in the old version, i.e., `STATE_STORE_LEVEL_GROUP_ROCKSDB_WINDOW_STORE_0100_TO_24` (we are only using RocksDB-based window stores in the test). Finally, during the implementation of KIP-444, we discovered that we named a group incorrectly. That's why we filter also for group `stream-rocksdb-window-metrics`. So to sum up, it is hard to compare the verifications for KV stores and window stores, because they are used in different tests. Sorry, I should have been clearer on that before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10334) Transactions not working properly
Luis Araujo created KAFKA-10334: --- Summary: Transactions not working properly Key: KAFKA-10334 URL: https://issues.apache.org/jira/browse/KAFKA-10334 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 2.3.0, 2.1.0 Reporter: Luis Araujo I'm using transactions provided by Kafka Producer API in a Scala project built with SBT. The dependency used in the project is: "org.apache.kafka" % "kafka-clients" % "2.1.0" I followed the documentation and I was expecting that transactions fail when I call .commitTransaction if some problem is raised when sending a message like it's described in the documentation: [https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html ] Unfortunatelly, when testing this behaviour using a message larger than the size accepted by the Kafka broker/cluster, the transactions are not working properly. I tested with a 3 Kafka broker cluster with 1MB message max size (default value): - when the message has 1MB, the transaction is aborted and an exception is raised when calling commitTransaction() - when the message is bigger than 1MB, the transaction is completed successfully without the message being written. no exception is thrown. As an example, this means that when I produce 9 messages with 1 KB and 1 message with 1.1MB in the same transaction, the transaction is completed but only 9 messages are written to the Kafka cluster. I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka cluster and Kafka Producer API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording
cadonna commented on a change in pull request #9098: URL: https://github.com/apache/kafka/pull/9098#discussion_r463578816 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java ## @@ -227,7 +228,8 @@ public MockProcessorContext(final Properties config, final TaskId taskId, final this.metrics = new StreamsMetricsImpl( new Metrics(metricConfig), threadId, - streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG) + streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), +Time.SYSTEM Review comment: I would prefer to postpone that, because currently it is not strictly needed and the time is only used in the RocksDB recording trigger that records only internal RocksDB metrics. I do not see how exposing time would be useful for users during testing. If anybody complains, we can still do it in a future KIP. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10335) Blocking of producer IO thread when calling send() from callback
Alexander Sibiryakov created KAFKA-10335: Summary: Blocking of producer IO thread when calling send() from callback Key: KAFKA-10335 URL: https://issues.apache.org/jira/browse/KAFKA-10335 Project: Kafka Issue Type: Bug Components: clients, producer Reporter: Alexander Sibiryakov We had application which supposed to be using KafkaProducer to deliver results of some work. Sometimes delivery of results weren't successful because of network connectivity errors or maintenance happening on the broker side. In such cases we wanted application to send an event with error and original message details. All good, but we wanted errors to be delivered to a separate topic. So we implemented a callback in send() method, using the same producer instance and calling send() from there. This application worked for some time, but then we encountered that its producer was stuck. Almost no CPU consumption and expiring batches for hours. After connecting with debugger it turned out that sender IO thread is blocking. When record is expired, a callback was called, which contained a call to send(), implying usage of a new topic, which metadata is not present in producer's client cache. When send() is missing metadata, it is allowed to block for up to max.block.ms interval, which is 60 secs by default. If application is active, then it will quickly result in a large amount of accumulated records. Every record will block IO thread for 60s. Therefore sender IO thread is essentially blocked. In Producer only Sender IO thread contains a call to client's poll() method, which is responsible for all the network communication and metadata update. If poll() is executed with significant delay then it will result to errors, connected with various timeouts. That's it we've got a stuck producer with little chance to recover. To summarise, pre-requisites for the problem are sending from callback, using the same producer instance and usage of topic which wasn't seen before. I think it is important to decide if the issue is KafkaProducer misuse or its bug. Code is callbacks shouldn't block, that is clear, but at the same time, no one expects already initialised producer to block. Depending on decision I could produce a fix, it can be either a warning when user is trying to call a send() from callback, or reduction of max allowed blocking time for metadata update. It could be just docs changes, or even nothing. Here is code to reproduce the issue, the output it is producing follows the code snippet. Tested on Confluent Cloud, from my desktop with 100 Mbps connection. {code:java} public static void main(String[] args) throws IOException { byte[] blob = new byte[262144]; Properties properties = new Properties(); properties.load(new FileReader("kafka-staging.properties")); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); properties.setProperty("request.timeout.ms", "5000"); properties.setProperty("delivery.timeout.ms", "5000"); KafkaProducer producer = new KafkaProducer(properties); while (true) { ProducerRecord record = new ProducerRecord<>("alex-test-valid-data", blob); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println(exception); long start = System.currentTimeMillis(); ProducerRecord record = new ProducerRecord<>("alex-test-errors", blob); producer.send(record); // blocking caused by metadata update long timeElapsed = System.currentTimeMillis() - start; System.err.println("time spent blocking IO thread: " + timeElapsed); } } }); } } {code} {noformat} [2020-07-31 14:35:51,936: INFO/main] (AbstractConfig.java:347) - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [pkc-l915e.europe-west1.gcp.confluent.cloud:9092] buffer.memory = 33554432 client.dns.lookup = default client.id = compression.type = none connections.max.idle.ms = 54 delivery.timeout.ms = 5000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 6 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata
[GitHub] [kafka] cadonna commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording
cadonna commented on a change in pull request #9098: URL: https://github.com/apache/kafka/pull/9098#discussion_r463617899 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) { throw new ProcessorStateException(fatal); } -// Setup metrics before the database is opened, otherwise the metrics are not updated +// Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB -maybeSetUpMetricsRecorder(configs); +maybeSetUpStatistics(configs); openRocksDB(dbOptions, columnFamilyOptions); open = true; + +addValueProvidersToMetricsRecorder(configs); } -private void maybeSetUpMetricsRecorder(final Map configs) { -if (userSpecifiedOptions.statistics() == null && +private void maybeSetUpStatistics(final Map configs) { +if (userSpecifiedOptions.statistics() != null) { +userSpecifiedStatistics = true; +} +if (!userSpecifiedStatistics && RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { -isStatisticsRegistered = true; // metrics recorder will clean up statistics object final Statistics statistics = new Statistics(); userSpecifiedOptions.setStatistics(statistics); -metricsRecorder.addStatistics(name, statistics); +} +} + +private void addValueProvidersToMetricsRecorder(final Map configs) { +final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig(); +final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics(); +if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) { +final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache(); +metricsRecorder.addValueProviders(name, db, cache, statistics); +} else { +metricsRecorder.addValueProviders(name, db, null, statistics); +log.warn("A table format configuration is used that does not expose the block cache. This means " + +"that metrics that relate to the block cache may be wrong if the block cache is shared."); } Review comment: I agree with you that it is not ideal and thank you for this lesson on reflection. Indeed, I do not like reflection in this case, because it makes the code too much dependent on RocksDB internals. We should use reflection to check if the public API to configure RocksDB changed in a newer version, but that is another story. I do not understand how the alternative of wrapping `BlockBasedTableConfig` into `BlockBasedTableConfigWithAccessibleCache` should work. Since the cache is not accessible in `BlockBasedTableConfig` it will also not be accessible when it is wrapped in `BlockBasedTableConfigWithAccessibleCache` (despite the name). We need to get the reference to the cache when the cache is set in `BlockBasedTableConfig`. If the cache is already set we can only use reflection. Since the block based table format is the only format in RocksDB that uses the cache, I do not see why a user absolutely needs to pass a new `BlockBasedTableConfig` object. I think for now it is OK to log a warning, and clearly document that the provided `BlockBasedTableConfig` object should be used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long
rajinisivaram commented on pull request #9092: URL: https://github.com/apache/kafka/pull/9092#issuecomment-667147526 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method
abbccdda commented on pull request #9096: URL: https://github.com/apache/kafka/pull/9096#issuecomment-667173698 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9109: MINOR: Add notes for 2.6 on reassignment tool changes
hachikuji opened a new pull request #9109: URL: https://github.com/apache/kafka/pull/9109 Add some notable changes to the reassignment tool for the 2.6 release. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
abbccdda commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r463682515 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -4068,6 +4093,58 @@ public void testListOffsetsMetadataNonRetriableErrors() throws Exception { } } +@Test +public void testListOffsetsPartialResponse() throws Exception { Review comment: Good coverage ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -949,13 +953,27 @@ public void onFailure(RuntimeException e) { leader, tp); partitionsToRetry.add(tp); } else { -partitionDataMap.put(tp, new ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch)); +int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(ListOffsetResponse.UNKNOWN_EPOCH); +partitionDataMap.put(tp, new ListOffsetPartition() +.setPartitionIndex(tp.partition()) +.setTimestamp(offset) +.setCurrentLeaderEpoch(currentLeaderEpoch)); } } } return regroupPartitionMapByNode(partitionDataMap); } +private static List toListOffsetTopics(Map timestampsToSearch) { Review comment: Let's move this helper into `ListOffsetRequest` ## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ## @@ -47,96 +42,11 @@ public static final int CONSUMER_REPLICA_ID = -1; public static final int DEBUGGING_REPLICA_ID = -2; -// top level fields -private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id", -"Broker id of the follower. For normal consumers, use -1."); -private static final Field.Int8 ISOLATION_LEVEL = new Field.Int8("isolation_level", -"This setting controls the visibility of transactional records. " + -"Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED " + -"(isolation_level = 1), non-transactional and COMMITTED transactional records are visible. " + -"To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current " + -"LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the " + -"result, which allows consumers to discard ABORTED transactional records"); -private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics", -"Topics to list offsets."); - -// topic level fields -private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions", -"Partitions to list offsets."); - -// partition level fields -private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp", -"The target timestamp for the partition."); -private static final Field.Int32 MAX_NUM_OFFSETS = new Field.Int32("max_num_offsets", -"Maximum offsets to return."); - -private static final Field PARTITIONS_V0 = PARTITIONS.withFields( -PARTITION_ID, -TIMESTAMP, -MAX_NUM_OFFSETS); - -private static final Field TOPICS_V0 = TOPICS.withFields( -TOPIC_NAME, -PARTITIONS_V0); - -private static final Schema LIST_OFFSET_REQUEST_V0 = new Schema( -REPLICA_ID, -TOPICS_V0); - -// V1 removes max_num_offsets -private static final Field PARTITIONS_V1 = PARTITIONS.withFields( -PARTITION_ID, -TIMESTAMP); - -private static final Field TOPICS_V1 = TOPICS.withFields( -TOPIC_NAME, -PARTITIONS_V1); - -private static final Schema LIST_OFFSET_REQUEST_V1 = new Schema( -REPLICA_ID, -TOPICS_V1); - -// V2 adds a field for the isolation level -private static final Schema LIST_OFFSET_REQUEST_V2 = new Schema( -REPLICA_ID, -ISOLATION_LEVEL, -TOPICS_V1); - -// V3 bump used to indicate that on quota violation brokers send out responses before throttling. -private static final Schema LIST_OFFSET_REQUEST_V3 = LIST_OFFSET_REQUEST_V2; - -// V4 introduces the current leader epoch, which is used for fencing -private static final Field PARTITIONS_V4 = PARTITIONS.withFields( -PARTITION_ID, -CURRENT_LEADER_EPOCH, -TIMESTAMP); - -private static final Field TOPICS_V4 = TOPICS.withFields( -TOPIC_NAME, -PARTITIONS_V4); - -private static final Schema LIST_OFFSET_REQUEST_V4 = new Schema( -REPLICA_ID, -ISOLATION_LEVEL, -TOPICS_V4); - -// V5 b
[jira] [Created] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions
John Roesler created KAFKA-10336: Summary: Rolling upgrade with Suppression AND Standbys may throw exceptions Key: KAFKA-10336 URL: https://issues.apache.org/jira/browse/KAFKA-10336 Project: Kafka Issue Type: Bug Components: streams Reporter: John Roesler Tl;dr: If you have standbys AND use Suppress with changelogging enabled, you may experience exceptions leading to threads shutting down on the OLD instances during a rolling upgrade. No corruption is expected, and when the rolling upgrade completes, all threads will be running and processing correctly. Details: The Suppression changelog has had to change its internal data format several times to fix bugs. The binary schema of the changelog values is determined by a version header on the records, and new versions are able to decode all old versions' formats. The suppression changelog decoder is also configured to throw an exception if it encounters a version number that it doesn't recognize, causing the thread to stop processing and shut down. When standbys are configured, there is one so-called "active" worker writing into the suppression buffer and sending the same messages into the changelog, while another "standby" worker reads those messages, decodes them, and maintains a hot-standby replica of the suppression buffer. If the standby worker is running and older version of Streams than the active worker, what can happen today is that the active worker may write changelog messages with a higher version number than the standby worker can understand. When the standby worker receives one of these messages, it will throw the exception and shut down its thread. Note, although the exceptions are undesired, at least this behavior protects the integrity of the application and prevents data corruption or loss. Workaround: Several workarounds are possible: This only affects clusters that do all of (A) rolling bounce, (B) suppression, (C) standby replicas, (D) changelogged suppression buffers. Changing any of those four variables will prevent the issue from occurring. I would NOT recommend disabling (D), and (B) is probably off the table, since the application logic presumably depends on it. Therefore, your practical choices are to disable standbys (C), or to do a full-cluster bounce (A). Personally, I think (A) is the best option. Also note, although the exceptions and threads shutting down are not ideal, they would only afflict the old-versioned nodes. I.e., the nodes you intend to replace anyway. So another "workaround" is simply to ignore the exceptions and proceed with the rolling bounce. As the old-versioned nodes are replaced with new-versioned nodes, the new nodes will again be able to decode their peers' changelog messages and be able to maintain the hot-standby replicas of the suppression buffers. Detection: Although I really should have anticipated this condition, I first detected it while expanding our system test coverage as part of KAFKA-10173. I added a rolling upgrade test with an application that uses both suppression and standby replicas, and observed that the rolling upgrades would occasionally cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the rolling-upgrade configuration and only do full-cluster upgrades. Resolving _this_ ticket will allow us to re-enable rolling upgrades. Proposed solution: Part 1: Since Streams can decode both current and past versions, but not future versions, we need to implement a mechanism to prevent new-versioned nodes from writing new-versioned messages, which would appear as future-versioned messages to the old-versioned nodes. We have an UPGRADE_FROM configuration that we could leverage to accomplish this. In that case, when upgrading from 2.3 to 2.4, you would set UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) nodes would continue writing messages in the old (2.3) format. Thus, the still-running old nodes will still be able to read them. Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. Post-bounce, the nodes would start writing in the 2.4 format, which is ok because all the members are running 2.4 at this point and can decode these messages, even if they are still configured to write with version 2.3. After the second rolling bounce, the whole cluster is both running 2.4 and writing with the 2.4 format. Part 2: Managing two rolling bounces can be difficult, so it is also desirable to implement a mechanism for automatically negotiating the schema version internally. In fact, this is already present in Streams, and it is called "version probing". Right now, version probing is used to enable the exact same kind of transition from an old-message-format to a new-message-format when both old and new members are in the cluster, but it is only
[jira] [Commented] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)
[ https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168977#comment-17168977 ] John Roesler commented on KAFKA-10322: -- Hi all, Thanks for your insights [~ableegoldman] ! I realized after reading your comment that I'd never written down the full extent of the idea you referenced. I've just documented it here: https://issues.apache.org/jira/browse/KAFKA-10336 It seems like we could use the same general mechanism to provide an upgrade path to a version of Streams that solves this issue. Thanks, -John > InMemoryWindowStore restore keys format incompatibility (lack of > sequenceNumber in keys on topic) > - > > Key: KAFKA-10322 > URL: https://issues.apache.org/jira/browse/KAFKA-10322 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 > Environment: windows/linux >Reporter: Tomasz Bradło >Priority: Major > > I have regular groupBy&Counting stream configuration: > {code:java} > > fun addStream(kStreamBuilder: StreamsBuilder) { > val storeSupplier = Stores.inMemoryWindowStore("count-store", > Duration.ofDays(10), > Duration.ofDays(1), > false) > val storeBuilder: StoreBuilder> = > Stores > .windowStoreBuilder(storeSupplier, > JsonSerde(CountableEvent::class.java), Serdes.Long()) > kStreamBuilder > .stream("input-topic", Consumed.with(Serdes.String(), > Serdes.String())) > .map {_, jsonRepresentation -> > KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} > .groupByKey() > .windowedBy(TimeWindows.of(Duration.ofDays(1))) > > .count(Materialized.with(JsonSerde(CountableEvent::class.java), > Serdes.Long())) > .toStream() > .to("topic1-count") > val storeConsumed = > Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), > Duration.ofDays(1).toMillis()), Serdes.Long()) > kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", > storeConsumed, passThroughProcessorSupplier) > }{code} > While sending to "topic1-count", for serializing the key > [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] > is used which is using > [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] > so the message key format is: > {code:java} > real_grouping_key + timestamp(8bytes){code} > > Everything works. I can get correct values from state-store. But, in recovery > scenario, when [GlobalStateManagerImpl > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters > offset < highWatermark loop then > [InMemoryWindowStore stateRestoreCallback > |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads > from "topic1-count" and fails to extract valid key and timestamp using > [WindowKeySchema.extractStoreKeyBytes > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and > [WindowKeySchema.extractStoreTimestamp. > |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It > fails because it expects format: > {code:java} > real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code} > How this is supposed to work in this case? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions
[ https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10336: - Priority: Blocker (was: Major) > Rolling upgrade with Suppression AND Standbys may throw exceptions > -- > > Key: KAFKA-10336 > URL: https://issues.apache.org/jira/browse/KAFKA-10336 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: John Roesler >Priority: Blocker > > Tl;dr: > If you have standbys AND use Suppress with changelogging enabled, you may > experience exceptions leading to threads shutting down on the OLD instances > during a rolling upgrade. No corruption is expected, and when the rolling > upgrade completes, all threads will be running and processing correctly. > Details: > The Suppression changelog has had to change its internal data format several > times to fix bugs. The binary schema of the changelog values is determined by > a version header on the records, and new versions are able to decode all old > versions' formats. > The suppression changelog decoder is also configured to throw an exception if > it encounters a version number that it doesn't recognize, causing the thread > to stop processing and shut down. > When standbys are configured, there is one so-called "active" worker writing > into the suppression buffer and sending the same messages into the changelog, > while another "standby" worker reads those messages, decodes them, and > maintains a hot-standby replica of the suppression buffer. > If the standby worker is running and older version of Streams than the active > worker, what can happen today is that the active worker may write changelog > messages with a higher version number than the standby worker can understand. > When the standby worker receives one of these messages, it will throw the > exception and shut down its thread. > Note, although the exceptions are undesired, at least this behavior protects > the integrity of the application and prevents data corruption or loss. > Workaround: > Several workarounds are possible: > This only affects clusters that do all of (A) rolling bounce, (B) > suppression, (C) standby replicas, (D) changelogged suppression buffers. > Changing any of those four variables will prevent the issue from occurring. I > would NOT recommend disabling (D), and (B) is probably off the table, since > the application logic presumably depends on it. Therefore, your practical > choices are to disable standbys (C), or to do a full-cluster bounce (A). > Personally, I think (A) is the best option. > Also note, although the exceptions and threads shutting down are not ideal, > they would only afflict the old-versioned nodes. I.e., the nodes you intend > to replace anyway. So another "workaround" is simply to ignore the exceptions > and proceed with the rolling bounce. As the old-versioned nodes are replaced > with new-versioned nodes, the new nodes will again be able to decode their > peers' changelog messages and be able to maintain the hot-standby replicas of > the suppression buffers. > Detection: > Although I really should have anticipated this condition, I first detected it > while expanding our system test coverage as part of KAFKA-10173. I added a > rolling upgrade test with an application that uses both suppression and > standby replicas, and observed that the rolling upgrades would occasionally > cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the > rolling-upgrade configuration and only do full-cluster upgrades. Resolving > _this_ ticket will allow us to re-enable rolling upgrades. > Proposed solution: > Part 1: > Since Streams can decode both current and past versions, but not future > versions, we need to implement a mechanism to prevent new-versioned nodes > from writing new-versioned messages, which would appear as future-versioned > messages to the old-versioned nodes. > We have an UPGRADE_FROM configuration that we could leverage to accomplish > this. In that case, when upgrading from 2.3 to 2.4, you would set > UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) > nodes would continue writing messages in the old (2.3) format. Thus, the > still-running old nodes will still be able to read them. > Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. > Post-bounce, the nodes would start writing in the 2.4 format, which is ok > because all the members are running 2.4 at this point and can decode these > messages, even if they are still configured to write with version 2.3. > After the second rolling bounce, the whole cluster is both running 2.4 and > writing with the 2.4 format. > Part 2: > Managing two rolling bounces can be difficult, so it is also desirable to >
[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions
[ https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10336: - Affects Version/s: 2.6.0 2.3.0 2.4.0 2.5.0 > Rolling upgrade with Suppression AND Standbys may throw exceptions > -- > > Key: KAFKA-10336 > URL: https://issues.apache.org/jira/browse/KAFKA-10336 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: John Roesler >Priority: Blocker > Fix For: 2.7.0 > > > Tl;dr: > If you have standbys AND use Suppress with changelogging enabled, you may > experience exceptions leading to threads shutting down on the OLD instances > during a rolling upgrade. No corruption is expected, and when the rolling > upgrade completes, all threads will be running and processing correctly. > Details: > The Suppression changelog has had to change its internal data format several > times to fix bugs. The binary schema of the changelog values is determined by > a version header on the records, and new versions are able to decode all old > versions' formats. > The suppression changelog decoder is also configured to throw an exception if > it encounters a version number that it doesn't recognize, causing the thread > to stop processing and shut down. > When standbys are configured, there is one so-called "active" worker writing > into the suppression buffer and sending the same messages into the changelog, > while another "standby" worker reads those messages, decodes them, and > maintains a hot-standby replica of the suppression buffer. > If the standby worker is running and older version of Streams than the active > worker, what can happen today is that the active worker may write changelog > messages with a higher version number than the standby worker can understand. > When the standby worker receives one of these messages, it will throw the > exception and shut down its thread. > Note, although the exceptions are undesired, at least this behavior protects > the integrity of the application and prevents data corruption or loss. > Workaround: > Several workarounds are possible: > This only affects clusters that do all of (A) rolling bounce, (B) > suppression, (C) standby replicas, (D) changelogged suppression buffers. > Changing any of those four variables will prevent the issue from occurring. I > would NOT recommend disabling (D), and (B) is probably off the table, since > the application logic presumably depends on it. Therefore, your practical > choices are to disable standbys (C), or to do a full-cluster bounce (A). > Personally, I think (A) is the best option. > Also note, although the exceptions and threads shutting down are not ideal, > they would only afflict the old-versioned nodes. I.e., the nodes you intend > to replace anyway. So another "workaround" is simply to ignore the exceptions > and proceed with the rolling bounce. As the old-versioned nodes are replaced > with new-versioned nodes, the new nodes will again be able to decode their > peers' changelog messages and be able to maintain the hot-standby replicas of > the suppression buffers. > Detection: > Although I really should have anticipated this condition, I first detected it > while expanding our system test coverage as part of KAFKA-10173. I added a > rolling upgrade test with an application that uses both suppression and > standby replicas, and observed that the rolling upgrades would occasionally > cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the > rolling-upgrade configuration and only do full-cluster upgrades. Resolving > _this_ ticket will allow us to re-enable rolling upgrades. > Proposed solution: > Part 1: > Since Streams can decode both current and past versions, but not future > versions, we need to implement a mechanism to prevent new-versioned nodes > from writing new-versioned messages, which would appear as future-versioned > messages to the old-versioned nodes. > We have an UPGRADE_FROM configuration that we could leverage to accomplish > this. In that case, when upgrading from 2.3 to 2.4, you would set > UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) > nodes would continue writing messages in the old (2.3) format. Thus, the > still-running old nodes will still be able to read them. > Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. > Post-bounce, the nodes would start writing in the 2.4 format, which is ok > because all the members are running 2.4 at this point and can decode these > messages, even if they are still configured to write with version 2.3. > After the second rolling bounce, t
[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions
[ https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10336: - Fix Version/s: 2.7.0 > Rolling upgrade with Suppression AND Standbys may throw exceptions > -- > > Key: KAFKA-10336 > URL: https://issues.apache.org/jira/browse/KAFKA-10336 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: John Roesler >Priority: Blocker > Fix For: 2.7.0 > > > Tl;dr: > If you have standbys AND use Suppress with changelogging enabled, you may > experience exceptions leading to threads shutting down on the OLD instances > during a rolling upgrade. No corruption is expected, and when the rolling > upgrade completes, all threads will be running and processing correctly. > Details: > The Suppression changelog has had to change its internal data format several > times to fix bugs. The binary schema of the changelog values is determined by > a version header on the records, and new versions are able to decode all old > versions' formats. > The suppression changelog decoder is also configured to throw an exception if > it encounters a version number that it doesn't recognize, causing the thread > to stop processing and shut down. > When standbys are configured, there is one so-called "active" worker writing > into the suppression buffer and sending the same messages into the changelog, > while another "standby" worker reads those messages, decodes them, and > maintains a hot-standby replica of the suppression buffer. > If the standby worker is running and older version of Streams than the active > worker, what can happen today is that the active worker may write changelog > messages with a higher version number than the standby worker can understand. > When the standby worker receives one of these messages, it will throw the > exception and shut down its thread. > Note, although the exceptions are undesired, at least this behavior protects > the integrity of the application and prevents data corruption or loss. > Workaround: > Several workarounds are possible: > This only affects clusters that do all of (A) rolling bounce, (B) > suppression, (C) standby replicas, (D) changelogged suppression buffers. > Changing any of those four variables will prevent the issue from occurring. I > would NOT recommend disabling (D), and (B) is probably off the table, since > the application logic presumably depends on it. Therefore, your practical > choices are to disable standbys (C), or to do a full-cluster bounce (A). > Personally, I think (A) is the best option. > Also note, although the exceptions and threads shutting down are not ideal, > they would only afflict the old-versioned nodes. I.e., the nodes you intend > to replace anyway. So another "workaround" is simply to ignore the exceptions > and proceed with the rolling bounce. As the old-versioned nodes are replaced > with new-versioned nodes, the new nodes will again be able to decode their > peers' changelog messages and be able to maintain the hot-standby replicas of > the suppression buffers. > Detection: > Although I really should have anticipated this condition, I first detected it > while expanding our system test coverage as part of KAFKA-10173. I added a > rolling upgrade test with an application that uses both suppression and > standby replicas, and observed that the rolling upgrades would occasionally > cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the > rolling-upgrade configuration and only do full-cluster upgrades. Resolving > _this_ ticket will allow us to re-enable rolling upgrades. > Proposed solution: > Part 1: > Since Streams can decode both current and past versions, but not future > versions, we need to implement a mechanism to prevent new-versioned nodes > from writing new-versioned messages, which would appear as future-versioned > messages to the old-versioned nodes. > We have an UPGRADE_FROM configuration that we could leverage to accomplish > this. In that case, when upgrading from 2.3 to 2.4, you would set > UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) > nodes would continue writing messages in the old (2.3) format. Thus, the > still-running old nodes will still be able to read them. > Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. > Post-bounce, the nodes would start writing in the 2.4 format, which is ok > because all the members are running 2.4 at this point and can decode these > messages, even if they are still configured to write with version 2.3. > After the second rolling bounce, the whole cluster is both running 2.4 and > writing with the 2.4 format. > Part 2: > Managing two rolling bounces can be difficult, so it is
[jira] [Commented] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions
[ https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168979#comment-17168979 ] John Roesler commented on KAFKA-10336: -- I've upgraded this to a blocker for 2.7.0, so that we won't forget about it. It was a regression in 2.3.0 when we changed the suppression buffer format the first time, but we didn't detect it because of a testing gap. However, it's a pretty serious issue, and will only become more impactful as more people use suppression and as we make other internal topic format changes, for example in the fix for https://issues.apache.org/jira/browse/KAFKA-10322 > Rolling upgrade with Suppression AND Standbys may throw exceptions > -- > > Key: KAFKA-10336 > URL: https://issues.apache.org/jira/browse/KAFKA-10336 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: John Roesler >Priority: Blocker > Fix For: 2.7.0 > > > Tl;dr: > If you have standbys AND use Suppress with changelogging enabled, you may > experience exceptions leading to threads shutting down on the OLD instances > during a rolling upgrade. No corruption is expected, and when the rolling > upgrade completes, all threads will be running and processing correctly. > Details: > The Suppression changelog has had to change its internal data format several > times to fix bugs. The binary schema of the changelog values is determined by > a version header on the records, and new versions are able to decode all old > versions' formats. > The suppression changelog decoder is also configured to throw an exception if > it encounters a version number that it doesn't recognize, causing the thread > to stop processing and shut down. > When standbys are configured, there is one so-called "active" worker writing > into the suppression buffer and sending the same messages into the changelog, > while another "standby" worker reads those messages, decodes them, and > maintains a hot-standby replica of the suppression buffer. > If the standby worker is running and older version of Streams than the active > worker, what can happen today is that the active worker may write changelog > messages with a higher version number than the standby worker can understand. > When the standby worker receives one of these messages, it will throw the > exception and shut down its thread. > Note, although the exceptions are undesired, at least this behavior protects > the integrity of the application and prevents data corruption or loss. > Workaround: > Several workarounds are possible: > This only affects clusters that do all of (A) rolling bounce, (B) > suppression, (C) standby replicas, (D) changelogged suppression buffers. > Changing any of those four variables will prevent the issue from occurring. I > would NOT recommend disabling (D), and (B) is probably off the table, since > the application logic presumably depends on it. Therefore, your practical > choices are to disable standbys (C), or to do a full-cluster bounce (A). > Personally, I think (A) is the best option. > Also note, although the exceptions and threads shutting down are not ideal, > they would only afflict the old-versioned nodes. I.e., the nodes you intend > to replace anyway. So another "workaround" is simply to ignore the exceptions > and proceed with the rolling bounce. As the old-versioned nodes are replaced > with new-versioned nodes, the new nodes will again be able to decode their > peers' changelog messages and be able to maintain the hot-standby replicas of > the suppression buffers. > Detection: > Although I really should have anticipated this condition, I first detected it > while expanding our system test coverage as part of KAFKA-10173. I added a > rolling upgrade test with an application that uses both suppression and > standby replicas, and observed that the rolling upgrades would occasionally > cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the > rolling-upgrade configuration and only do full-cluster upgrades. Resolving > _this_ ticket will allow us to re-enable rolling upgrades. > Proposed solution: > Part 1: > Since Streams can decode both current and past versions, but not future > versions, we need to implement a mechanism to prevent new-versioned nodes > from writing new-versioned messages, which would appear as future-versioned > messages to the old-versioned nodes. > We have an UPGRADE_FROM configuration that we could leverage to accomplish > this. In that case, when upgrading from 2.3 to 2.4, you would set > UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) > nodes would continue writing messages in the old (2.3) format. Thus, the > sti
[GitHub] [kafka] rhauch commented on a change in pull request #9109: MINOR: Add notes for 2.6 on reassignment tool changes
rhauch commented on a change in pull request #9109: URL: https://github.com/apache/kafka/pull/9109#discussion_r463705431 ## File path: docs/upgrade.html ## @@ -49,6 +49,14 @@ Notable changes in 2 Fetch requests and other requests intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER(6) instead of REPLICA_NOT_AVAILABLE(9) if the broker is not a replica, ensuring that this transient error during reassignments is handled by all clients as a retriable exception. +There are several notable changes to the reassignment tool kafka-reassign-partitions.sh +following the completion of +https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment";>KIP-455. +This tool now requires the --additional flag to be providing when changing the throttle of an Review comment: Nit: fix grammar: ```suggestion This tool now requires the --additional flag to be provided when changing the throttle of an ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8964: KAFKA-9450: Decouple flushing state from commiting
vvcephei commented on pull request #8964: URL: https://github.com/apache/kafka/pull/8964#issuecomment-667212439 Sounds good, @guozhangwang . Another option is to make an internal config so that we can parameterize the benchmarks and get a more thorough understanding of the impact of this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9078: KAFKA-10132: Return correct value types for MBean attributes
abbccdda commented on a change in pull request #9078: URL: https://github.com/apache/kafka/pull/9078#discussion_r463708722 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java ## @@ -272,8 +272,16 @@ public MBeanInfo getMBeanInfo() { for (Map.Entry entry : this.metrics.entrySet()) { String attribute = entry.getKey(); KafkaMetric metric = entry.getValue(); +String metricType = double.class.getName(); + +try { +metricType = metric.metricValue().getClass().getName(); +} catch (NullPointerException e) { Review comment: It is weird to catch NPE, could we just check whether `metricValue` is defined? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei edited a comment on pull request #8964: KAFKA-9450: Decouple flushing state from commiting
vvcephei edited a comment on pull request #8964: URL: https://github.com/apache/kafka/pull/8964#issuecomment-667212439 Sounds good, @guozhangwang . Another option is to make an internal config so that we can parameterize the benchmarks and get a more thorough understanding of the impact of this change. Of course, we can easily transition from the hard-coded value to an internal config in a small follow-on PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9105: MINOR: closable object Memory leak prevention
abbccdda commented on a change in pull request #9105: URL: https://github.com/apache/kafka/pull/9105#discussion_r463708981 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -197,8 +197,8 @@ synchronized boolean lock(final TaskId taskId) throws IOException { final FileChannel channel; -try { -channel = getOrCreateFileChannel(taskId, lockFile.toPath()); +try (final FileChannel fileChannel = getOrCreateFileChannel(taskId, lockFile.toPath())) { Review comment: It looks weird to use the resource outside of try block, does this really work? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei merged pull request #8993: URL: https://github.com/apache/kafka/pull/8993 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-667214395 Thanks @guozhangwang ! I've merged to 2.5. If we do an 2.5.1RC1, it'll be included. Otherwise, it'll go into 2.5.2. Either way, it will run as part of nightly branch tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-667233872 @chia7712 : So, it's just a rebase and there is no change in your PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9109: MINOR: Add notes for 2.6 on reassignment tool changes
hachikuji merged pull request #9109: URL: https://github.com/apache/kafka/pull/9109 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…
abbccdda commented on pull request #9108: URL: https://github.com/apache/kafka/pull/9108#issuecomment-667241038 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
ableegoldman commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r463738138 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -368,7 +313,96 @@ public void handleAssignment(final Map> activeTasks, addNewTask(task); } } +} + +private void handleCloseAndRecycle(final List tasksToRecycle, + final List tasksToCloseClean, + final List tasksToCloseDirty, + final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final LinkedHashMap taskCloseExceptions) { +if (!tasksToCloseDirty.isEmpty()) { +throw new IllegalArgumentException("Tasks to close-dirty should be empty"); +} + +// for all tasks to close or recycle, we should first right a checkpoint as in post-commit +final List tasksToCheckpoint = new ArrayList<>(tasksToCloseClean); +tasksToCheckpoint.addAll(tasksToRecycle); +for (final Task task : tasksToCheckpoint) { +try { +// Always try to first suspend and commit the task before checkpointing it; +// some tasks may already be suspended which should be a no-op. +// +// Also since active tasks should already be suspended / committed and +// standby tasks should have no offsets to commit, we should expect nothing to commit +task.suspend(); + +// Note that we are not actually committing here but just check if we need to write checkpoint file: +// 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully, +//and their changelog positions should not change at all postCommit would not write the checkpoint again. +// 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably +//write the checkpoint file. +final Map offsets = task.prepareCommit(); Review comment: Alternatively, now that we enforce checkpoint during suspension, we could just remove the `pre/postCommit` for active tasks in `handleAssignment`. It just seems nice to be able to assert that we never call `pre/postCommit` after a task is suspended This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions
[ https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169038#comment-17169038 ] Sophie Blee-Goldman commented on KAFKA-10336: - Thanks for the detailed bug report. Quick question – are you sure this is only possible when standbys are enabled? It seems like you could end up in a situation where a suppression task ends up on an upgraded instance, which then completes restoration and starts writing to the changelog with the newer protocol. Then during a subsequent rebalance of the rolling upgrade, this task gets migrated back to an old instance, tries to read the new protocol version, and dies. There's some consolation here: this case is probably pretty rare, at least relative to the case with standbys enabled > Rolling upgrade with Suppression AND Standbys may throw exceptions > -- > > Key: KAFKA-10336 > URL: https://issues.apache.org/jira/browse/KAFKA-10336 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: John Roesler >Priority: Blocker > Fix For: 2.7.0 > > > Tl;dr: > If you have standbys AND use Suppress with changelogging enabled, you may > experience exceptions leading to threads shutting down on the OLD instances > during a rolling upgrade. No corruption is expected, and when the rolling > upgrade completes, all threads will be running and processing correctly. > Details: > The Suppression changelog has had to change its internal data format several > times to fix bugs. The binary schema of the changelog values is determined by > a version header on the records, and new versions are able to decode all old > versions' formats. > The suppression changelog decoder is also configured to throw an exception if > it encounters a version number that it doesn't recognize, causing the thread > to stop processing and shut down. > When standbys are configured, there is one so-called "active" worker writing > into the suppression buffer and sending the same messages into the changelog, > while another "standby" worker reads those messages, decodes them, and > maintains a hot-standby replica of the suppression buffer. > If the standby worker is running and older version of Streams than the active > worker, what can happen today is that the active worker may write changelog > messages with a higher version number than the standby worker can understand. > When the standby worker receives one of these messages, it will throw the > exception and shut down its thread. > Note, although the exceptions are undesired, at least this behavior protects > the integrity of the application and prevents data corruption or loss. > Workaround: > Several workarounds are possible: > This only affects clusters that do all of (A) rolling bounce, (B) > suppression, (C) standby replicas, (D) changelogged suppression buffers. > Changing any of those four variables will prevent the issue from occurring. I > would NOT recommend disabling (D), and (B) is probably off the table, since > the application logic presumably depends on it. Therefore, your practical > choices are to disable standbys (C), or to do a full-cluster bounce (A). > Personally, I think (A) is the best option. > Also note, although the exceptions and threads shutting down are not ideal, > they would only afflict the old-versioned nodes. I.e., the nodes you intend > to replace anyway. So another "workaround" is simply to ignore the exceptions > and proceed with the rolling bounce. As the old-versioned nodes are replaced > with new-versioned nodes, the new nodes will again be able to decode their > peers' changelog messages and be able to maintain the hot-standby replicas of > the suppression buffers. > Detection: > Although I really should have anticipated this condition, I first detected it > while expanding our system test coverage as part of KAFKA-10173. I added a > rolling upgrade test with an application that uses both suppression and > standby replicas, and observed that the rolling upgrades would occasionally > cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the > rolling-upgrade configuration and only do full-cluster upgrades. Resolving > _this_ ticket will allow us to re-enable rolling upgrades. > Proposed solution: > Part 1: > Since Streams can decode both current and past versions, but not future > versions, we need to implement a mechanism to prevent new-versioned nodes > from writing new-versioned messages, which would appear as future-versioned > messages to the old-versioned nodes. > We have an UPGRADE_FROM configuration that we could leverage to accomplish > this. In that case, when upgrading from 2.3 to 2.4, you would set > UPG
[jira] [Commented] (KAFKA-10137) Clean-up retain Duplicate logic in Window Stores
[ https://issues.apache.org/jira/browse/KAFKA-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169045#comment-17169045 ] Sophie Blee-Goldman commented on KAFKA-10137: - Yeah I thought that might be the case, although I'm not sure I agree with the reasoning behind it. You shouldn't be switching `retainDuplicates` on and off for an existing store; the changelog bytes should match the local store format. Maybe the theory was that users might try to build two stores off of the same changelog, one with duplicates and one without. I don't think we should support that either. On the other hand it's reasonable to write the output of a windowed aggregation to a topic and then use that as the source topic for a table/store/global store with or without duplicates. Unfortunately that is exactly [the case|https://issues.apache.org/jira/browse/KAFKA-10322] which is broken by this "bug" (whether intentional or not) > Clean-up retain Duplicate logic in Window Stores > > > Key: KAFKA-10137 > URL: https://issues.apache.org/jira/browse/KAFKA-10137 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.5.0 >Reporter: Bruno Cadonna >Priority: Minor > > Stream-stream joins use the regular `WindowStore` implementation but with > `retainDuplicates` set to true. To allow for duplicates while using the same > unique-key underlying stores we just wrap the key with an incrementing > sequence number before inserting it. > The logic to maintain and append the sequence number is present in multiple > locations, namely in the changelogging window store and in its underlying > window stores. We should consolidate this code to one single location. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
ableegoldman commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r463750941 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ## @@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String group0100To24, checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, expectedNumberofE2ELatencyMetrics); Review comment: I see, thanks for the explanation. The suggestion worked This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions
[ https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-10336: -- Labels: bug user-experience (was: ) > Rolling upgrade with Suppression AND Standbys may throw exceptions > -- > > Key: KAFKA-10336 > URL: https://issues.apache.org/jira/browse/KAFKA-10336 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: John Roesler >Priority: Blocker > Labels: bug, user-experience > Fix For: 2.7.0 > > > Tl;dr: > If you have standbys AND use Suppress with changelogging enabled, you may > experience exceptions leading to threads shutting down on the OLD instances > during a rolling upgrade. No corruption is expected, and when the rolling > upgrade completes, all threads will be running and processing correctly. > Details: > The Suppression changelog has had to change its internal data format several > times to fix bugs. The binary schema of the changelog values is determined by > a version header on the records, and new versions are able to decode all old > versions' formats. > The suppression changelog decoder is also configured to throw an exception if > it encounters a version number that it doesn't recognize, causing the thread > to stop processing and shut down. > When standbys are configured, there is one so-called "active" worker writing > into the suppression buffer and sending the same messages into the changelog, > while another "standby" worker reads those messages, decodes them, and > maintains a hot-standby replica of the suppression buffer. > If the standby worker is running and older version of Streams than the active > worker, what can happen today is that the active worker may write changelog > messages with a higher version number than the standby worker can understand. > When the standby worker receives one of these messages, it will throw the > exception and shut down its thread. > Note, although the exceptions are undesired, at least this behavior protects > the integrity of the application and prevents data corruption or loss. > Workaround: > Several workarounds are possible: > This only affects clusters that do all of (A) rolling bounce, (B) > suppression, (C) standby replicas, (D) changelogged suppression buffers. > Changing any of those four variables will prevent the issue from occurring. I > would NOT recommend disabling (D), and (B) is probably off the table, since > the application logic presumably depends on it. Therefore, your practical > choices are to disable standbys (C), or to do a full-cluster bounce (A). > Personally, I think (A) is the best option. > Also note, although the exceptions and threads shutting down are not ideal, > they would only afflict the old-versioned nodes. I.e., the nodes you intend > to replace anyway. So another "workaround" is simply to ignore the exceptions > and proceed with the rolling bounce. As the old-versioned nodes are replaced > with new-versioned nodes, the new nodes will again be able to decode their > peers' changelog messages and be able to maintain the hot-standby replicas of > the suppression buffers. > Detection: > Although I really should have anticipated this condition, I first detected it > while expanding our system test coverage as part of KAFKA-10173. I added a > rolling upgrade test with an application that uses both suppression and > standby replicas, and observed that the rolling upgrades would occasionally > cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the > rolling-upgrade configuration and only do full-cluster upgrades. Resolving > _this_ ticket will allow us to re-enable rolling upgrades. > Proposed solution: > Part 1: > Since Streams can decode both current and past versions, but not future > versions, we need to implement a mechanism to prevent new-versioned nodes > from writing new-versioned messages, which would appear as future-versioned > messages to the old-versioned nodes. > We have an UPGRADE_FROM configuration that we could leverage to accomplish > this. In that case, when upgrading from 2.3 to 2.4, you would set > UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) > nodes would continue writing messages in the old (2.3) format. Thus, the > still-running old nodes will still be able to read them. > Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. > Post-bounce, the nodes would start writing in the 2.4 format, which is ok > because all the members are running 2.4 at this point and can decode these > messages, even if they are still configured to write with version 2.3. > After the second rolling bounce, the whole cluster is both runn
[GitHub] [kafka] ableegoldman commented on a change in pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method
ableegoldman commented on a change in pull request #9096: URL: https://github.com/apache/kafka/pull/9096#discussion_r463752139 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -144,15 +144,35 @@ private boolean allSubscriptionsEqual(Set allTopics, return true; } + +/** + * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. + * The method includes the following steps: + * + * 1. Reassign as many previously owned partitions as possible Review comment: ```suggestion * 1. Reassign as many previously owned partitions as possible, up to the maxQuota ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception
[ https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169052#comment-17169052 ] John Thomas commented on KAFKA-10186: - [~ableegoldman] If we abort a transaction with any non-flushed data, we want to throw a different exception, since we know its non-fatal ? {color:#172b4d}If my understanding is correct, In Sender#maybeSendAndPollTransactionalRequest : transactionManaer.hasAbortableError() -> This is fatal, {color} {color:#172b4d}transactionManager.isAborting() - > This is something we know that its aborted, and is recoverable. -- {color} {code:java} if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { RuntimeException exception = transactionManager.lastError(); if (exception == null) { exception = new KafkaException("Failing batch since transaction was aborted"); } accumulator.abortUndrainedBatches(exception); } }{code} PS : #newbie ! > Aborting transaction with pending data should throw non-fatal exception > --- > > Key: KAFKA-10186 > URL: https://issues.apache.org/jira/browse/KAFKA-10186 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip, newbie, newbie++ > > Currently if you try to abort a transaction with any pending (non-flushed) > data, the send exception is set to > {code:java} > KafkaException("Failing batch since transaction was aborted"){code} > This exception type is generally considered fatal, but this is a valid state > to be in -- the point of throwing the exception is to alert that the records > will not be sent, not that you are in an unrecoverable error state. > We should throw a different (possibly new) type of exception here to > distinguish from fatal and recoverable errors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10316) Consider renaming getter method for Interactive Queries
[ https://issues.apache.org/jira/browse/KAFKA-10316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169062#comment-17169062 ] John Thomas commented on KAFKA-10316: - [~mjsax] The vote has been open for >72 hours, and the KIP got binding+1 (John Roesler) , non-binding+1 (Navinder Brar, Jorge Esteban, Bruno Cadonna) and no -1 votes. Need little help figuring this out , Will the approval be of type 1 or 2 ? # The [KIP process point 4|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-Process] says : The criteria for acceptance is [lazy majority|https://cwiki.apache.org/confluence/display/KAFKA/Bylaws#Bylaws-Approvals]. # At the same time in the [Bylaws|https://cwiki.apache.org/confluence/display/KAFKA/Bylaws#Bylaws-Approvals] for a "code change action" approval "one +1 from a committer who has not authored the patch followed by a Lazy approval (not counting the vote of the contributor), moving to lazy majority if a -1 is received" . > Consider renaming getter method for Interactive Queries > --- > > Key: KAFKA-10316 > URL: https://issues.apache.org/jira/browse/KAFKA-10316 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: John Thomas >Priority: Minor > Labels: beginner, need-kip, newbie > > In the 2.5 release, we introduce new classes for Interactive Queries via > KIP-535 (cf https://issues.apache.org/jira/browse/KAFKA-6144). The KIP did > not specify the names for getter methods of `KeyQueryMetadata` explicitly and > they were added in the PR as `getActiveHost()`, `getStandbyHosts()`, and > `getPartition()`. > However, in Kafka it is custom to not use the `get` prefix for getters and > thus the methods should have been added as `activeHost()`, `standbyHosts()`, > and `partition()`, respectively. > We should consider renaming the methods accordingly, by deprecating the > existing ones and adding the new ones in parallel. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
guozhangwang commented on pull request #9094: URL: https://github.com/apache/kafka/pull/9094#issuecomment-667297400 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
guozhangwang commented on pull request #9094: URL: https://github.com/apache/kafka/pull/9094#issuecomment-667297606 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Johnny-Malizia commented on pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion
Johnny-Malizia commented on pull request #8936: URL: https://github.com/apache/kafka/pull/8936#issuecomment-667297977 Apologies for my inactivity here, I had to take a little more time to understand how/when the index files are currently being opened. I'll try to have this resolved sometime this weekend. My understanding matches what @junrao stated. I have now pushed up a change that I believe implements what we are all after. I think the implementation makes sense, but if anybody here knows a better way to do this please let me know. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r463780418 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -368,7 +313,96 @@ public void handleAssignment(final Map> activeTasks, addNewTask(task); } } +} + +private void handleCloseAndRecycle(final List tasksToRecycle, + final List tasksToCloseClean, + final List tasksToCloseDirty, + final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final LinkedHashMap taskCloseExceptions) { +if (!tasksToCloseDirty.isEmpty()) { +throw new IllegalArgumentException("Tasks to close-dirty should be empty"); +} + +// for all tasks to close or recycle, we should first right a checkpoint as in post-commit +final List tasksToCheckpoint = new ArrayList<>(tasksToCloseClean); +tasksToCheckpoint.addAll(tasksToRecycle); +for (final Task task : tasksToCheckpoint) { +try { +// Always try to first suspend and commit the task before checkpointing it; +// some tasks may already be suspended which should be a no-op. +// +// Also since active tasks should already be suspended / committed and +// standby tasks should have no offsets to commit, we should expect nothing to commit +task.suspend(); + +// Note that we are not actually committing here but just check if we need to write checkpoint file: +// 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully, +//and their changelog positions should not change at all postCommit would not write the checkpoint again. +// 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably +//write the checkpoint file. +final Map offsets = task.prepareCommit(); Review comment: I'm leaning towards the second option here: in `handleAssignment` we actually do not commit at all, but only use 1) `preCommit` to validate certain states, and use 2) `postCommit` and expecting it to be a no-op actually. Let me see if I can get rid of those two in `handleAssignment`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8964: KAFKA-9450: Decouple flushing state from commiting
ableegoldman commented on pull request #8964: URL: https://github.com/apache/kafka/pull/8964#issuecomment-667306865 Kicked off system tests https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4080/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r463790190 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -368,7 +313,96 @@ public void handleAssignment(final Map> activeTasks, addNewTask(task); } } +} + +private void handleCloseAndRecycle(final List tasksToRecycle, + final List tasksToCloseClean, + final List tasksToCloseDirty, + final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final LinkedHashMap taskCloseExceptions) { +if (!tasksToCloseDirty.isEmpty()) { +throw new IllegalArgumentException("Tasks to close-dirty should be empty"); +} + +// for all tasks to close or recycle, we should first right a checkpoint as in post-commit +final List tasksToCheckpoint = new ArrayList<>(tasksToCloseClean); +tasksToCheckpoint.addAll(tasksToRecycle); +for (final Task task : tasksToCheckpoint) { +try { +// Always try to first suspend and commit the task before checkpointing it; +// some tasks may already be suspended which should be a no-op. +// +// Also since active tasks should already be suspended / committed and +// standby tasks should have no offsets to commit, we should expect nothing to commit +task.suspend(); + +// Note that we are not actually committing here but just check if we need to write checkpoint file: +// 1) for active tasks prepareCommit should return empty if it has committed during suspension successfully, +//and their changelog positions should not change at all postCommit would not write the checkpoint again. +// 2) for standby tasks prepareCommit should always return empty, and then in postCommit we would probably +//write the checkpoint file. +final Map offsets = task.prepareCommit(); Review comment: I tried some ways and ended-up with explicitly specifying suspend / postCommit for `standby` tasks only, and use `prepareCommit` to check if the previous revocation has failed or not. Personally I'm happy with the current workflow now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9107: KAFKA-5488: KIP-418 implementation
vvcephei commented on pull request #9107: URL: https://github.com/apache/kafka/pull/9107#issuecomment-667352270 Hey @mjsax , do you have time to give this a first pass? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgroothuijsen commented on a change in pull request #9078: KAFKA-10132: Return correct value types for MBean attributes
rgroothuijsen commented on a change in pull request #9078: URL: https://github.com/apache/kafka/pull/9078#discussion_r463845316 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java ## @@ -272,8 +272,16 @@ public MBeanInfo getMBeanInfo() { for (Map.Entry entry : this.metrics.entrySet()) { String attribute = entry.getKey(); KafkaMetric metric = entry.getValue(); +String metricType = double.class.getName(); + +try { +metricType = metric.metricValue().getClass().getName(); +} catch (NullPointerException e) { Review comment: @abbccdda That's what I figured as well at first, and it passes in unit tests, but there's a strange side effect. Upon starting `connect-distributed`, a whole bunch of NPEs appear for various metrics. It looks like when I call `metricValue()`, it eventually arrives [here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1811), and throws an error because `member` is null. I could also do a null check there, though I'm not sure what value the method would return in that case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169176#comment-17169176 ] Guozhang Wang commented on KAFKA-: -- Another walkaround for now, is to use `KTable#mapValues()` in which we can project out the unwanted fields and then materialize. More specifically, you can write: builder.table("topic", Consumed) // do not use Materialized to enforce materializing the source table .mapValues(..., Materialized) // project out those unwanted fields, and then Materialize with the new serde As a result only one store would be created for the resulted KTable after the mapValues. > Decouple topic serdes from materialized serdes > -- > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Wish > Components: streams >Reporter: Maarten >Priority: Minor > Labels: needs-kip > > It would be valuable to us to have the the encoding format in a Kafka topic > decoupled from the encoding format used to cache the data locally in a kafka > streams app. > We would like to use the `range()` function in the interactive queries API to > look up a series of results, but can't with our encoding scheme due to our > keys being variable length. > We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n > proto have similar problems. > Currently we use the following code to work around this problem: > {code} > builder > .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde)) > .to("intermediate-topic", Produced.with(intermediateKeySerde, > intermediateValueSerde)); > t1 = builder > .table("intermediate-topic", Consumed.with(intermediateKeySerde, > intermediateValueSerde), t1Materialized); > {code} > With the encoding formats decoupled, the code above could be reduced to a > single step, not requiring an intermediate topic. > Based on feedback on my [SO > question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes] > a change that introduces this would impact state restoration when using an > input topic for recovery. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman edited a comment on pull request #8964: KAFKA-9450: Decouple flushing state from commiting
ableegoldman edited a comment on pull request #8964: URL: https://github.com/apache/kafka/pull/8964#issuecomment-667306865 Kicked off system tests https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4081 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
vvcephei commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r463867098 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -461,6 +463,42 @@ public void flush() { } } +public void flushCache() { +RuntimeException firstException = null; +// attempting to flush the stores +if (!stores.isEmpty()) { +log.debug("Flushing all store caches registered in the state manager: {}", stores); +for (final StateStoreMetadata metadata : stores.values()) { +final StateStore store = metadata.stateStore; + +try { +// buffer should be flushed to send all records to changelog +if (store instanceof TimeOrderedKeyValueBuffer) { +store.flush(); +} else if (store instanceof CachedStateStore) { +((CachedStateStore) store).flushCache(); +} Review comment: Seems like there's the missing possibility that it's not TimeOrdered or Cached. Should we log a different message than "Flushed cache or buffer" in that case, to indicate we _didn't_ flush it? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -267,80 +266,26 @@ public void handleAssignment(final Map> activeTasks, // check for tasks that were owned previously but have changed active/standby status tasksToRecycle.add(task); } else { -tasksToClose.add(task); -} -} - -for (final Task task : tasksToClose) { -try { -if (task.isActive()) { -// Active tasks are revoked and suspended/committed during #handleRevocation -if (!task.state().equals(State.SUSPENDED)) { -log.error("Active task {} should be suspended prior to attempting to close but was in {}", - task.id(), task.state()); -throw new IllegalStateException("Active task " + task.id() + " should have been suspended"); -} -} else { -task.suspend(); -task.prepareCommit(); -task.postCommit(); -} -completeTaskCloseClean(task); -cleanUpTaskProducer(task, taskCloseExceptions); -tasks.remove(task.id()); -} catch (final RuntimeException e) { -final String uncleanMessage = String.format( -"Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", -task.id()); -log.error(uncleanMessage, e); -taskCloseExceptions.put(task.id(), e); -// We've already recorded the exception (which is the point of clean). -// Now, we should go ahead and complete the close because a half-closed task is no good to anyone. -dirtyTasks.add(task); +tasksToCloseClean.add(task); } } -for (final Task oldTask : tasksToRecycle) { -final Task newTask; -try { -if (oldTask.isActive()) { -if (!oldTask.state().equals(State.SUSPENDED)) { -// Active tasks are revoked and suspended/committed during #handleRevocation -log.error("Active task {} should be suspended prior to attempting to close but was in {}", - oldTask.id(), oldTask.state()); -throw new IllegalStateException("Active task " + oldTask.id() + " should have been suspended"); -} -final Set partitions = standbyTasksToCreate.remove(oldTask.id()); -newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); -cleanUpTaskProducer(oldTask, taskCloseExceptions); -} else { -oldTask.suspend(); -oldTask.prepareCommit(); -oldTask.postCommit(); -final Set partitions = activeTasksToCreate.remove(oldTask.id()); -newTask = activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, partitions, mainConsumer); -} -tasks.remove(oldTask.id()); -addNewTask(newTask); -} catch (final RuntimeException e) { -final String uncleanMessage = String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", oldTask.id()); -log.error(uncleanMessage, e); -
[jira] [Commented] (KAFKA-8159) Built-in serdes for signed numbers do not obey lexicographical ordering
[ https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169187#comment-17169187 ] Sophie Blee-Goldman commented on KAFKA-8159: A related bummer is that you can't do fetches over a range of positive and negative timestamps in a Window/Session store (well you can, you just won't get the behavior you'd expect) I say "related" because we don't use the built-in serdes to serialize the long in a windowed key, we use ByteBuffer#putLong. But it has the same problem with our lexicographical bytes > Built-in serdes for signed numbers do not obey lexicographical ordering > --- > > Key: KAFKA-8159 > URL: https://issues.apache.org/jira/browse/KAFKA-8159 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > Currently we assume consistent ordering between serialized and deserialized > keys, e.g. if the objects obey objA < objB < objC then the serialized Bytes > will also obey bytesA < bytesB < bytesC. This is not true in general of the > built-in serdes for signed numerical types (eg Integer, Long). Specifically, > it is broken by the negative number representations which are > lexicographically greater than (all) positive number representations. > > One consequence of this is that an interactive query of a key range with a > negative lower bound and positive upper bound (eg keyValueStore.range(-1, 1) > will result in "unexpected behavior" depending on the specific store type. > > For RocksDB stores with caching disabled, an empty iterator will be returned > regardless of whether any records do exist in that range. > For in-memory stores and ANY store with caching enabled, Streams will throw > an unchecked exception and crash. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8027) Gradual decline in performance of CachingWindowStore provider when number of keys grow
[ https://issues.apache.org/jira/browse/KAFKA-8027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169189#comment-17169189 ] Guozhang Wang commented on KAFKA-8027: -- We encountered similar issues in our benchmarks which is based on recent Kafka versions as well. Looking at the profiler graph, there are three big buckets: 1) byte-buffer allocation for concatenating the segmented key from raw key / timestamp. ~10% 2) synchronization on the cache layer to access cache to get the iterator. ~20% 3) putting all the range keys into a tree-map (i.e. a putAll will be called) before iterating them to achieve thread safety. ~60% Among those, I've had some ideas to optimize 1), and is still digging around how to make 2) / 3) to be less costly. I will try to prepare a PR in our benchmarks and post the results here. > Gradual decline in performance of CachingWindowStore provider when number of > keys grow > -- > > Key: KAFKA-8027 > URL: https://issues.apache.org/jira/browse/KAFKA-8027 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Prashant >Priority: Major > Labels: interactivequ, kafka-streams > > We observed this during a performance test of our stream application which > tracks user's activity and provides REST interface to query the window state > store. We used default configuration of Materialized i.e. withCachingEnabled > for storing user behaviour stats in a window state store > (CompositeWindowStore with CachingWindowStore as underlyin which internally > uses RocksDBStore for persistent). > While querying window store with store.fetch(key, long, long), it internally > tries to fetch the range from ThreadCache which uses a byte iterator to > search for a key in cache and on a cache miss it goes to RocksDBStore for > result. So, when number of keys in cache becomes large this ThreadCache > search starts taking time (range Iterator on all keys) which impacts > WindowStore query performance. > > Workaround: If we disable cache with switch on Materialized instance i.e. > withCachingDisabled, key search is delegated directly to RocksDBStore which > is way faster and completed search in microseconds against millis in case of > CachingWindowStore. > > Stats: With Unique users > 0.5M, random search for a key i.e. UserId: > > withCachingEnabled : 40 < t < 80ms (upper bound increases as unique users > grow) > withCahingDisabled: t < 1ms (Almost constant time) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463880076 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,71 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describes finalized as well as supported features. By default, the request is issued to any + * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions + * parameter. This is particularly useful if the user requires strongly consistent reads of + * finalized features. + * + * The following exceptions can be anticipated when calling {@code get()} on the future from the + * returned {@link DescribeFeaturesResult}: + * + * {@link org.apache.kafka.common.errors.TimeoutException} + * If the request timed out before the describe operation could finish. + * + * + * @param options the options to use + * + * @return the {@link DescribeFeaturesResult} containing the result + */ +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); Review comment: Sorry, I do not understand why should describeFeatures (in post KIP-500) be handled only by controller? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10211) Add DirectoryConfigProvider
[ https://issues.apache.org/jira/browse/KAFKA-10211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169192#comment-17169192 ] David Weber commented on KAFKA-10211: - Here is an example I use in our environment [https://github.com/dweber019/kafka-connect-file-directory-config-provider] > Add DirectoryConfigProvider > --- > > Key: KAFKA-10211 > URL: https://issues.apache.org/jira/browse/KAFKA-10211 > Project: Kafka > Issue Type: Improvement >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > > Add a ConfigProvider which reads secrets from files in a directory, per > [KIP-632|https://cwiki.apache.org/confluence/display/KAFKA/KIP-632%3A+Add+DirectoryConfigProvider]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-667442912 > So, it's just a rebase and there is no change in your PR? The last change to this PR is to rename a class (according to @ijuma’s comment) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
ableegoldman commented on pull request #9094: URL: https://github.com/apache/kafka/pull/9094#issuecomment-667445902 `EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta` failed, all other tests passed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9024: [DO NOT MERGE (yet)] MINOR: bump Streams integration test log level to DEBUG
ableegoldman commented on pull request #9024: URL: https://github.com/apache/kafka/pull/9024#issuecomment-667447234 Ok I totally forgot about this PR. Let's just merge the demotion of broker/zookeeper logs and leave streams logs at INFO for now. If that's not sufficient for debugging eg the `EosBetaUpgradeIntegrationTest` then I'll take another look at debugging the tests that fail when DEBUG logging is turned on. I did take a brief look at the tests, it's not a timeout or excessive logs or anything that seems at all correlated to the log level. It fails on an assertion of the value of `MockProcessorSupplier#capturedProcessors` 🤷♀️ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman edited a comment on pull request #9024: [DO NOT MERGE (yet)] MINOR: bump Streams integration test log level to DEBUG
ableegoldman edited a comment on pull request #9024: URL: https://github.com/apache/kafka/pull/9024#issuecomment-667447234 Ok I totally forgot about this PR. @guozhangwang @mjsax Let's just merge the demotion of broker/zookeeper logs and leave streams logs at INFO for now. If that's not sufficient for useful debugging (eg the `EosBetaUpgradeIntegrationTest`) then I'll take another look at fixing the tests which fail when DEBUG logging is turned on. I did take a brief look at the failing tests, it's not a timeout or excessive logs or anything that seems at all correlated to the log level. It fails on an assertion of the value of `MockProcessorSupplier#capturedProcessors` -- seems pretty bizarre to me 🤷♀️ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dhruvilshah3 opened a new pull request #9110: MINOR: Ensure a reason is logged for every segment deletion
dhruvilshah3 opened a new pull request #9110: URL: https://github.com/apache/kafka/pull/9110 This PR improves the logging for segment deletion to ensure that a reason is logged for every segment that is deleted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dhruvilshah3 commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
dhruvilshah3 commented on pull request #8850: URL: https://github.com/apache/kafka/pull/8850#issuecomment-667454222 I attempted to improve the logging further. This also removes the side effect of logging as part of evaluating the predicate. https://github.com/apache/kafka/pull/9110 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…
chia7712 commented on pull request #9102: URL: https://github.com/apache/kafka/pull/9102#issuecomment-667456986 > Are there any serde use cases that require client id? I observed this issue when we are removing custom client id config. It causes error as the metrics of our serde is encoded with client id. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463912157 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) { +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +final NodeProvider provider = +options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider(); + +Call call = new Call( +"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) { + +@Override +ApiVersionsRequest.Builder createRequest(int timeoutMs) { +return new ApiVersionsRequest.Builder(); +} + +@Override +void handleResponse(AbstractResponse response) { +final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response; +if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) { +future.complete( +new FeatureMetadata( +apiVersionsResponse.finalizedFeatures(), +apiVersionsResponse.finalizedFeaturesEpoch(), +apiVersionsResponse.supportedFeatures())); +} else if (options.sendRequestToController() && apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) { +handleNotControllerError(Errors.NOT_CONTROLLER); +} else { +future.completeExceptionally( + Errors.forCode(apiVersionsResponse.data.errorCode()).exception()); +} +} + +@Override +void handleFailure(Throwable throwable) { +completeAllExceptionally(Collections.singletonList(future), throwable); +} +}; + +runnable.call(call, now); +return new DescribeFeaturesResult(future); +} + +@Override +public UpdateFeaturesResult updateFeatures( +final Map featureUpdates, final UpdateFeaturesOptions options) { +if (featureUpdates == null || featureUpdates.isEmpty()) { +throw new IllegalArgumentException("Feature updates can not be null or empty."); +} +Objects.requireNonNull(options, "UpdateFeaturesOptions can not be null"); + +final Map> updateFutures = new HashMap<>(); +final UpdateFeaturesRequestData.FeatureUpdateKeyCollection featureUpdatesRequestData += new UpdateFeaturesRequestData.FeatureUpdateKeyCollection(); +for (Map.Entry entry : featureUpdates.entrySet()) { +final String feature = entry.getKey(); +final FeatureUpdate update = entry.getValue(); +if (feature.trim().isEmpty()) { +throw new IllegalArgumentException("Provided feature can not be null or empty."); +} + +updateFutures.put(feature, new KafkaFutureImpl<>()); +final UpdateFeaturesRequestData.FeatureUpdateKey requestItem = +new UpdateFeaturesRequestData.FeatureUpdateKey(); +requestItem.setFeature(feature); +requestItem.setMaxVersionLevel(update.maxVersionLevel()); +requestItem.setAllowDowngrade(update.allowDowngrade()); +featureUpdatesRequestData.add(requestItem); +} +final UpdateFeaturesRequestData request = new UpdateFeaturesRequestData().setFeatureUpdates(featureUpdatesRequestData); + +final long now = time.milliseconds(); +final Call call = new Call("updateFeatures", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { + +@Override +UpdateFeaturesRequest.Builder createRequest(int timeoutMs) { +return new UpdateFeaturesRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +final UpdateFeaturesResponse response = +(UpdateFeaturesResponse) abstractResponse; + +// Check for controller change. +for (UpdatableFeatureResult result : response.data().results()) { +final Errors error = Errors.forCode(result.errorCode()); +if (error == Errors.NOT_CONTROLLER) { +handleNotControllerError(error); +throw error.exception(); +} +} + +for (UpdatableFeatureResult result : response.data().results()) { +
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463912498 ## File path: clients/src/main/java/org/apache/kafka/common/errors/FeatureUpdateFailedException.java ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class FeatureUpdateFailedException extends ApiException { Review comment: This exception corresponds to `Errors.FEATURE_UPDATE_FAILED`. The client can receive this exception during a call to `updateFeatures`, whenever a feature update can not be written to ZK. So this has to be a public error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9110: MINOR: Ensure a reason is logged for every segment deletion
kowshik commented on a change in pull request #9110: URL: https://github.com/apache/kafka/pull/9110#discussion_r463914087 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2227,14 +2210,17 @@ class Log(@volatile private var _dir: File, * @param segments The log segments to schedule for deletion * @param asyncDelete Whether the segment files should be deleted asynchronously */ - private def removeAndDeleteSegments(segments: Iterable[LogSegment], asyncDelete: Boolean): Unit = { + private def removeAndDeleteSegments(segments: Iterable[LogSegment], + asyncDelete: Boolean, + reason: SegmentDeletionReason): Unit = { if (segments.nonEmpty) { lock synchronized { // As most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by // removing the deleted segment, we should force materialization of the iterator here, so that results of the // iteration remain valid and deterministic. val toDelete = segments.toList toDelete.foreach { segment => + info(s"${reason.reasonString(this, segment)}") Review comment: If we passed in the deletion reason further into the `deleteSegmentFiles` method, it seems we can print the reason string just once for a batch of segments being deleted. And within the reason string, we can provide the reason for deleting the batch: https://github.com/confluentinc/ce-kafka/blob/master/core/src/main/scala/kafka/log/Log.scala#L2519 https://github.com/confluentinc/ce-kafka/blob/master/core/src/main/scala/kafka/log/Log.scala#L2526 ex: `info("Deleting segments due to $reason: ${segments.mkString(",")}"` where `$reason` provides `due to retention time 120ms breach`. The drawback is that sometimes we can not print segment-specific information since the error message is at a batch level. But generally it may be that segment-level deletion information could bloat our server logging, so it may be better to batch the logging instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463912498 ## File path: clients/src/main/java/org/apache/kafka/common/errors/FeatureUpdateFailedException.java ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class FeatureUpdateFailedException extends ApiException { Review comment: This exception corresponds to `Errors.FEATURE_UPDATE_FAILED`. The caller of `AdminClient#updateFeatures` can receive this exception whenever a feature update can not be written to ZK (due to a ZK issue). So this has to be a public error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463915406 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1647,6 +1844,188 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns a suitable error. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { +if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") +} +// NOTE: Below we set the finalized min version level to be the default minimum version +// level. If the finalized feature already exists, then, this can cause deprecation of all +// version levels in the closed range: +// [existingVersionRange.min(), defaultMinVersionLevel - 1]. +val defaultMinVersionLevel = brokerFeatures.defaultMinVersionLevel(update.feature) +val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, update.maxVersionLevel) +val numIncompatibleBrokers = controllerContext.liveOrShuttingDownBrokers.count(broker => { + val singleFinalizedFeature = +Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, newVersionRange))) + BrokerFeatures.hasIncompatibleFeatures(broker.features, singleFinalizedFeature) +}) +if (numIncompatibleBrokers == 0) { + Left(newVersionRange) +} else { + Right( +new ApiError(Errors.INVALID_REQUEST, + s"Could not apply finalized feature update because $numIncompatibleBrokers" + + " brokers were found to have incompatible features.")) +} + } + + /** + * Validate and process a finalized feature update. + * + * If the processing is successful, then, the return value contains: + * 1. the new FinalizedVersionRange for the feature, if the feature update was not meant to delete the feature. + * 2. Option.empty, if the feature update was meant to delete the feature. + * + * If the processing failed, then returned value contains a suitable ApiError. + * + * @param update the feature update to be processed. + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def processFeatureUpdate(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = { +val existingFeatures = featureCache.get + .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala) + .getOrElse(Map[String, FinalizedVersionRange]()) + +def newVersionRangeOrError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[Option[FinalizedVersionRange], ApiError] = { + newFinalizedVersionRangeOrIncompatibilityError(update) +.fold(versionRange => Left(Some(versionRange)), error => Right(error)) +} + +if (update.feature.isEmpty) { + // Check that the feature name is not empty. + Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be empty.")) +} else { + val cacheEntry = existingFeatures.get(update.feature).orNull + + // We handle deletion requests separately from non-deletion requests. + if (UpdateFeaturesRequest.isDeleteRequest(update)) { +if (cacheEntry == null) { + // Disallow deletion of a non-existing finalized feature. + Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not delete non-existing finalized feature: '${update.feature}'")) +} else { + Left(Option.empty) +} + } else if (update.maxVersionLevel() < 1) { +// Disallow deletion of a finalized feature without allowDowngrade flag set. +Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not provide maxVersionLevel: ${update.maxVersionLevel} less" + + s" than 1 for feature: '${update.feature}' without setting the" + + " allowDowngrade flag to true in the request.")) + } else { +if (cacheEntry == null) { + newVersionRangeOrError(update) +} else { + if (update.maxVersionLevel == cacheEntry.max()) { +// Disallow a case where target maxVersionLevel matches existing maxVersionLevel. +Right(new ApiError(Errors.INVALID_REQUEST, + s"Can not ${if (update.allowDowngrade) "do
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463915553 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (String) and a range of versions (defined by a + * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of its own supported features in its + * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (String) and a range of version levels (defined + * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the entire list of + *default supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + *Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + *broker binary has been upgraded to a newer version that supports the feature versioning + *system (KIP-584). This means the user is upgrading from an earlier version of the broker + *binary. In this case, we want to start with no finalized features and allow the user to + *finalize them whenever they are ready i.e. in the future whenever the user sets IBP config + *to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the + *features. This process ensures we do not enable all the possible features immediately after + *an upgrade, which could be harmful to Kafka. + *This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + *controller will start up and check if the FeatureZNode is absent. If absent, it will + *react by creating a FeatureZNode with disabled status and empty finalized features. + *Otherwise, if a node already exists in enabled status then the controller will just + *flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + *KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + *and whether it is disabled. In such a case, it won’t upgrade all features immediately. + *Instead it will just switch the FeatureZNode status to enabled status. This lets
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463916219 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -3615,6 +3662,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception { } } +@Test +public void testUpdateFeaturesDuringSuccess() throws Exception { +testUpdateFeatures( +makeTestFeatureUpdates(), +makeTestFeatureUpdateErrors(Errors.NONE)); +} + +@Test +public void testUpdateFeaturesInvalidRequestError() throws Exception { +testUpdateFeatures( +makeTestFeatureUpdates(), +makeTestFeatureUpdateErrors(Errors.INVALID_REQUEST)); +} + +@Test +public void testUpdateFeaturesUpdateFailedError() throws Exception { +testUpdateFeatures( +makeTestFeatureUpdates(), +makeTestFeatureUpdateErrors(Errors.FEATURE_UPDATE_FAILED)); +} + +@Test +public void testUpdateFeaturesPartialSuccess() throws Exception { +final Map errors = makeTestFeatureUpdateErrors(Errors.NONE); +errors.put("test_feature_2", Errors.INVALID_REQUEST); +testUpdateFeatures(makeTestFeatureUpdates(), errors); +} + +private Map makeTestFeatureUpdates() { +return Utils.mkMap( +Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, false)), +Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, true))); +} + +private Map makeTestFeatureUpdateErrors(final Errors error) { +final Map updates = makeTestFeatureUpdates(); +final Map errors = new HashMap<>(); +for (Map.Entry entry : updates.entrySet()) { +errors.put(entry.getKey(), error); +} +return errors; +} + +private void testUpdateFeatures(Map featureUpdates, +Map featureUpdateErrors) throws Exception { +try (final AdminClientUnitTestEnv env = mockClientEnv()) { +env.kafkaClient().prepareResponse( +body -> body instanceof UpdateFeaturesRequest, +prepareUpdateFeaturesResponse(featureUpdateErrors)); +final Map> futures = env.adminClient().updateFeatures( +featureUpdates, +new UpdateFeaturesOptions().timeoutMs(1)).values(); +for (Map.Entry> entry : futures.entrySet()) { +final KafkaFuture future = entry.getValue(); +final Errors error = featureUpdateErrors.get(entry.getKey()); +if (error == Errors.NONE) { +future.get(); +} else { +final ExecutionException e = assertThrows(ExecutionException.class, +() -> future.get()); Review comment: Isn't that what I'm using currently? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463916470 ## File path: clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.message.UpdateFeaturesResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + + +/** + * Possible error codes: + * + * - {@link Errors#CLUSTER_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_CONTROLLER} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#FEATURE_UPDATE_FAILED} + */ +public class UpdateFeaturesResponse extends AbstractResponse { + +private final UpdateFeaturesResponseData data; + +public UpdateFeaturesResponse(UpdateFeaturesResponseData data) { +this.data = data; +} + +public UpdateFeaturesResponse(Struct struct) { +final short latestVersion = (short) (UpdateFeaturesResponseData.SCHEMAS.length - 1); +this.data = new UpdateFeaturesResponseData(struct, latestVersion); +} + +public UpdateFeaturesResponse(Struct struct, short version) { +this.data = new UpdateFeaturesResponseData(struct, version); +} + +public Map errors() { +return data.results().valuesSet().stream().collect( +Collectors.toMap( +result -> result.feature(), Review comment: Like how? I don't understand. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463916688 ## File path: clients/src/main/resources/common/message/UpdateFinalizedFeaturesResponse.json ## @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "response", + "name": "UpdateFinalizedFeaturesResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code or `0` if there was no error." }, +{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", Review comment: Done now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463916610 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -983,8 +1144,25 @@ class KafkaController(val config: KafkaConfig, */ private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]): Unit = { try { + val filteredBrokers = scala.collection.mutable.Set[Int]() ++ brokers + if (config.isFeatureVersioningEnabled) { +def hasIncompatibleFeatures(broker: Broker): Boolean = { + val latestFinalizedFeatures = featureCache.get + if (latestFinalizedFeatures.isDefined) { +BrokerFeatures.hasIncompatibleFeatures(broker.features, latestFinalizedFeatures.get.features) + } else { +false + } +} +controllerContext.liveOrShuttingDownBrokers.foreach(broker => { + if (filteredBrokers.contains(broker.id) && hasIncompatibleFeatures(broker)) { Review comment: If the broker has feature incompatibilities, then it should die as soon as it has received the ZK update (it would die from within `FinalizedFeatureChangeListener`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] albert02lowis commented on pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…
albert02lowis commented on pull request #9108: URL: https://github.com/apache/kafka/pull/9108#issuecomment-667462930 Hi, there's another unit test that need to be moved out (`StreamStreamJoinIntegrationTest.shouldNotAccessJoinStoresWhenGivingName`) but I thought of doing that in another PR (to make the PRs small). But let me know if it's ok to just include it in this PR. (Btw, I force-push updated the commit with the right user email) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r463916470 ## File path: clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.message.UpdateFeaturesResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + + +/** + * Possible error codes: + * + * - {@link Errors#CLUSTER_AUTHORIZATION_FAILED} + * - {@link Errors#NOT_CONTROLLER} + * - {@link Errors#INVALID_REQUEST} + * - {@link Errors#FEATURE_UPDATE_FAILED} + */ +public class UpdateFeaturesResponse extends AbstractResponse { + +private final UpdateFeaturesResponseData data; + +public UpdateFeaturesResponse(UpdateFeaturesResponseData data) { +this.data = data; +} + +public UpdateFeaturesResponse(Struct struct) { +final short latestVersion = (short) (UpdateFeaturesResponseData.SCHEMAS.length - 1); +this.data = new UpdateFeaturesResponseData(struct, latestVersion); +} + +public UpdateFeaturesResponse(Struct struct, short version) { +this.data = new UpdateFeaturesResponseData(struct, version); +} + +public Map errors() { +return data.results().valuesSet().stream().collect( +Collectors.toMap( +result -> result.feature(), Review comment: Like how? I don't understand. Isn't that what I'm doing currently? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…
abbccdda commented on pull request #9108: URL: https://github.com/apache/kafka/pull/9108#issuecomment-667466886 @albert02lowis I see, let's try to do all of them in one PR then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…
abbccdda commented on a change in pull request #9102: URL: https://github.com/apache/kafka/pull/9102#discussion_r463920622 ## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ## @@ -228,6 +228,13 @@ public Password getPassword(String key) { return copy; } +public Map originals(Map configOverrides) { Review comment: s/originals/copyWithOverride? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions
[ https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169229#comment-17169229 ] John Roesler commented on KAFKA-10336: -- Good thinking, Sophie, that does indeed seem possible. > Rolling upgrade with Suppression AND Standbys may throw exceptions > -- > > Key: KAFKA-10336 > URL: https://issues.apache.org/jira/browse/KAFKA-10336 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: John Roesler >Priority: Blocker > Labels: bug, user-experience > Fix For: 2.7.0 > > > Tl;dr: > If you have standbys AND use Suppress with changelogging enabled, you may > experience exceptions leading to threads shutting down on the OLD instances > during a rolling upgrade. No corruption is expected, and when the rolling > upgrade completes, all threads will be running and processing correctly. > Details: > The Suppression changelog has had to change its internal data format several > times to fix bugs. The binary schema of the changelog values is determined by > a version header on the records, and new versions are able to decode all old > versions' formats. > The suppression changelog decoder is also configured to throw an exception if > it encounters a version number that it doesn't recognize, causing the thread > to stop processing and shut down. > When standbys are configured, there is one so-called "active" worker writing > into the suppression buffer and sending the same messages into the changelog, > while another "standby" worker reads those messages, decodes them, and > maintains a hot-standby replica of the suppression buffer. > If the standby worker is running and older version of Streams than the active > worker, what can happen today is that the active worker may write changelog > messages with a higher version number than the standby worker can understand. > When the standby worker receives one of these messages, it will throw the > exception and shut down its thread. > Note, although the exceptions are undesired, at least this behavior protects > the integrity of the application and prevents data corruption or loss. > Workaround: > Several workarounds are possible: > This only affects clusters that do all of (A) rolling bounce, (B) > suppression, (C) standby replicas, (D) changelogged suppression buffers. > Changing any of those four variables will prevent the issue from occurring. I > would NOT recommend disabling (D), and (B) is probably off the table, since > the application logic presumably depends on it. Therefore, your practical > choices are to disable standbys (C), or to do a full-cluster bounce (A). > Personally, I think (A) is the best option. > Also note, although the exceptions and threads shutting down are not ideal, > they would only afflict the old-versioned nodes. I.e., the nodes you intend > to replace anyway. So another "workaround" is simply to ignore the exceptions > and proceed with the rolling bounce. As the old-versioned nodes are replaced > with new-versioned nodes, the new nodes will again be able to decode their > peers' changelog messages and be able to maintain the hot-standby replicas of > the suppression buffers. > Detection: > Although I really should have anticipated this condition, I first detected it > while expanding our system test coverage as part of KAFKA-10173. I added a > rolling upgrade test with an application that uses both suppression and > standby replicas, and observed that the rolling upgrades would occasionally > cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the > rolling-upgrade configuration and only do full-cluster upgrades. Resolving > _this_ ticket will allow us to re-enable rolling upgrades. > Proposed solution: > Part 1: > Since Streams can decode both current and past versions, but not future > versions, we need to implement a mechanism to prevent new-versioned nodes > from writing new-versioned messages, which would appear as future-versioned > messages to the old-versioned nodes. > We have an UPGRADE_FROM configuration that we could leverage to accomplish > this. In that case, when upgrading from 2.3 to 2.4, you would set > UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) > nodes would continue writing messages in the old (2.3) format. Thus, the > still-running old nodes will still be able to read them. > Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. > Post-bounce, the nodes would start writing in the 2.4 format, which is ok > because all the members are running 2.4 at this point and can decode these > messages, even if they are still configured to write with version
[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…
chia7712 commented on pull request #9102: URL: https://github.com/apache/kafka/pull/9102#issuecomment-667482560 > Btw, I don't think this is an improvement rather than a bug, as we don't have any guarantee to see client id in serdes before. There is another reason. We do pass generated client id to metric reporter. It seems to me all plugins should see consistent props. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…
chia7712 commented on a change in pull request #9102: URL: https://github.com/apache/kafka/pull/9102#discussion_r463930430 ## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ## @@ -228,6 +228,13 @@ public Password getPassword(String key) { return copy; } +public Map originals(Map configOverrides) { Review comment: Will copy that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()
dongjinleekr commented on pull request #9007: URL: https://github.com/apache/kafka/pull/9007#issuecomment-667483663 @tombentley Congratulations! :congratulations: @omkreddy @mimaison Thanks again for the detailed review, as usual! :smiley: This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org