[GitHub] [kafka] inponomarev edited a comment on pull request #9107: KAFKA-5488: KIP-418 implementation

2020-07-31 Thread GitBox


inponomarev edited a comment on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-666749809


   ⚠️ Two differences with KIP specification, discussion needed⚠️ 
   
   1. Instead of multiple overloaded variants of `Branched.with` we now have 
`Branched.withFunction` and `Branched.withConsumer`. This is because of 
compiler warnings about overloading (`Function` and `Consumer` being 
indistinguishable when supplied as lambdas)
   
   2. 'Fully covariant' signatures like `Consumer>` don't work as expected. Used `Consumer>` 
instead
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8328) Kafka smooth expansion

2020-07-31 Thread ChenLin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168514#comment-17168514
 ] 

ChenLin commented on KAFKA-8328:


Thanks for your attention, I will re submit the batch to the trunk version。

> Kafka smooth expansion
> --
>
> Key: KAFKA-8328
> URL: https://issues.apache.org/jira/browse/KAFKA-8328
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.10.2.0
>Reporter: ChenLin
>Priority: Major
>  Labels: Kafka, expansion
> Fix For: 0.10.2.0
>
> Attachments: DiskUtil.png, Kafka_smooth_expansion.patch, 
> producerP999.png
>
>
> When expanding the kafka cluster, the new follower will read the data from 
> the earliest offset. This can result in a large amount of historical data 
> being read from the disk, putting a lot of pressure on the disk and affecting 
> the performance of the kafka service, for example, the producer write latency 
> will increase. In general, kafka's original expansion mechanism has the 
> following problems:
>    1. The new follower will put a lot of pressure on the disk;
>    2. Causes the producer write latency to increase;
>    3. Causes the consumer read latency to increase;
>     In order to solve these problems, we have proposed a solution for 
> smooth expansion. The main idea of the scheme is that the newly added 
> follower reads data from the HW position, and when the newly added follower 
> reads the data to a certain time threshold or data size threshold, the 
> follower enters the ISR queue. . Since the new follower reads data from the 
> HW location, most of the data read is in the operating system's cache, so it 
> does not put pressure on the disk and does not affect the performance of the 
> kafka service, thus solving the above problems. 
>     In order to illustrate the problems of the original expansion scheme, 
> we have done some tests, and there are corresponding test charts in the 
> attachment.
> !producerP999.png!
> !DiskUtil.png!  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8328) Kafka smooth expansion

2020-07-31 Thread ChenLin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168515#comment-17168515
 ] 

ChenLin commented on KAFKA-8328:


Thanks for your attention, I will re submit the batch to the trunk 
version。[~roncenzhao] [~xilangyan]

> Kafka smooth expansion
> --
>
> Key: KAFKA-8328
> URL: https://issues.apache.org/jira/browse/KAFKA-8328
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 0.10.2.0
>Reporter: ChenLin
>Priority: Major
>  Labels: Kafka, expansion
> Fix For: 0.10.2.0
>
> Attachments: DiskUtil.png, Kafka_smooth_expansion.patch, 
> producerP999.png
>
>
> When expanding the kafka cluster, the new follower will read the data from 
> the earliest offset. This can result in a large amount of historical data 
> being read from the disk, putting a lot of pressure on the disk and affecting 
> the performance of the kafka service, for example, the producer write latency 
> will increase. In general, kafka's original expansion mechanism has the 
> following problems:
>    1. The new follower will put a lot of pressure on the disk;
>    2. Causes the producer write latency to increase;
>    3. Causes the consumer read latency to increase;
>     In order to solve these problems, we have proposed a solution for 
> smooth expansion. The main idea of the scheme is that the newly added 
> follower reads data from the HW position, and when the newly added follower 
> reads the data to a certain time threshold or data size threshold, the 
> follower enters the ISR queue. . Since the new follower reads data from the 
> HW location, most of the data read is in the operating system's cache, so it 
> does not put pressure on the disk and does not affect the performance of the 
> kafka service, thus solving the above problems. 
>     In order to illustrate the problems of the original expansion scheme, 
> we have done some tests, and there are corresponding test charts in the 
> attachment.
> !producerP999.png!
> !DiskUtil.png!  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10137) Clean-up retain Duplicate logic in Window Stores

2020-07-31 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168521#comment-17168521
 ] 

Bruno Cadonna commented on KAFKA-10137:
---

I also noticed this the last time I looked at 
{{ChangeLoggingWindowBytesStore}}. Since the sequence number is not incremented 
if {{retainDuplicates}} is false, I think it is OK from a correctness point of 
view. 

Maybe the reason why we write the sequence number also when 
{{retainDuplicates}} is false is that in such a way we do not need to 
distinguish whether the key has a sequence number or not when we read the keys 
again.

> Clean-up retain Duplicate logic in Window Stores
> 
>
> Key: KAFKA-10137
> URL: https://issues.apache.org/jira/browse/KAFKA-10137
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Priority: Minor
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> The logic to maintain and append the sequence number is present in multiple 
> locations, namely in the changelogging window store and in its underlying 
> window stores. We should consolidate this code to one single location.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-31 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-667010664


   The test failure is unrelated. It's 
`org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest > 
shouldUpgradeFromEosAlphaToEosBeta[true]` again



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10331) MirrorMaker2 active-active replication without topic renaming

2020-07-31 Thread Wojtek Konsek (Jira)
Wojtek Konsek created KAFKA-10331:
-

 Summary: MirrorMaker2 active-active replication without topic 
renaming
 Key: KAFKA-10331
 URL: https://issues.apache.org/jira/browse/KAFKA-10331
 Project: Kafka
  Issue Type: Wish
  Components: mirrormaker
Reporter: Wojtek Konsek


MirrorMaker2 as implemented in Kafka 2.5.x does support active-active 
replication from source cluster topic "A" to destination cluster topic "A", but 
if replication is enabled in both directions, the messages are duplicated again 
and again.

Are there any plans to support bi-directional active-active replication, 
replicating from topic A on cluster 1  to topic A on cluster 2, and from topic 
A (same name) on cluster 2 to topic A on cluster 1?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10332) MirrorMaker2 fails to detect topic if remote topic is created first

2020-07-31 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-10332:
---
Summary: MirrorMaker2 fails to detect topic if remote topic is created 
first  (was: MirrorMaker2 fails to detect topic is remote topic is created 
first)

> MirrorMaker2 fails to detect topic if remote topic is created first
> ---
>
> Key: KAFKA-10332
> URL: https://issues.apache.org/jira/browse/KAFKA-10332
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.6.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> Setup:
> - 2 clusters: source and target
> - Mirroring data from source to target
> - create a topic called source.mytopic on the target cluster
> - create a topic called mytopic on the source cluster
> At this point, MM2 does not start mirroring the topic.
> This also happens if you delete and recreate a topic that is being mirrored.
> The issue is in 
> [refreshTopicPartitions()|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L211-L232]
>  which basically does a diff between the 2 clusters.
> When creating the topic on the source cluster last, it makes the partition 
> list of both clusters match, hence not triggering a reconfiguration



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10332) MirrorMaker2 fails to detect topic is remote topic is created first

2020-07-31 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-10332:
--

 Summary: MirrorMaker2 fails to detect topic is remote topic is 
created first
 Key: KAFKA-10332
 URL: https://issues.apache.org/jira/browse/KAFKA-10332
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.6.0
Reporter: Mickael Maison
Assignee: Mickael Maison


Setup:
- 2 clusters: source and target
- Mirroring data from source to target
- create a topic called source.mytopic on the target cluster
- create a topic called mytopic on the source cluster
At this point, MM2 does not start mirroring the topic.

This also happens if you delete and recreate a topic that is being mirrored.

The issue is in 
[refreshTopicPartitions()|https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L211-L232]
 which basically does a diff between the 2 clusters.
When creating the topic on the source cluster last, it makes the partition list 
of both clusters match, hence not triggering a reconfiguration



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10333) Provide an API to retrieve Kafka Connect task configurations

2020-07-31 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-10333:
--

 Summary: Provide an API to retrieve Kafka Connect task 
configurations
 Key: KAFKA-10333
 URL: https://issues.apache.org/jira/browse/KAFKA-10333
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Mickael Maison
Assignee: Mickael Maison


Kafka Connect exposes an API to retrieve configurations from connectors.

Connectors are responsible for creating tasks. When doing so, they have to 
build configurations for individual tasks. In some case, the configuration can 
be passed as is to tasks but in some others, the configuration is mutated to 
make tasks do specific work.

For example with MirrorSourceConnector, the connector configuration has a field 
"topics" which is a list of topics and patterns to mirror. When the connector 
builds task configurations, it resolves the list of topic names and patterns to 
exact partitions and spread the partitions over all the tasks.

It would be useful to identify the exact configuration each task is given.

For MM2, it would allow identifying the partitions that matched the topics 
field. I would also help understanding the impact when a task fails



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-31 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r463514179



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   You can use the following to get it right without the need to do the 
check for the e2e latency before filtering
   
   ```
   .filter(m -> m.metricName().tags().containsKey(tagKey) && 
   (m.metricName().group().equals(group0100To24) || 
m.metricName().group().equals(STATE_STORE_LEVEL_GROUP))
   ).collect(Collectors.toList());
   
   ```
   
   The reason for the difference between the KV store and the window store is 
that they are used in different tests with different number of state stores.
   
   The test that uses the KV stores tests three different types of KV stores, 
namely in-memory, rocksdb, and in-memory-lru-cache. For each of this types the 
old group name changes. That is also the reason we need to pass the parameter 
`group0100To24` to `checkKeyValueStoreMetrics()`.
   
   In `checkWindowStoreAndSuppressionBufferMetrics()` we need to filter for 
four groups, because the corresponding test uses suppression and window state 
store. Suppression buffers had their own groups in the old version. In the new 
version they moved into the state store group. Those groups are 
`BUFFER_LEVEL_GROUP_0100_TO_24` and `STATE_STORE_LEVEL_GROUP`. The window state 
store had their own group in the old version, i.e., 
`STATE_STORE_LEVEL_GROUP_ROCKSDB_WINDOW_STORE_0100_TO_24` (we are only using 
RocksDB-based window stores in the test). Finally, during the implementationof 
KIP-444, we discovered that we named a group incorrectly. That why we filter 
also for group `stream-rocksdb-window-metrics`.
   
   So to sum up, it is hard to compare the verifications for KV stores and 
window stores, because they are used in different tests. Sorry, I should have 
been clearer on that before. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-31 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r463514179



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   You can use the following to get it right without the need to do the 
check for the e2e latency before filtering
   
   ```
   .filter(m -> m.metricName().tags().containsKey(tagKey) && 
   (m.metricName().group().equals(group0100To24) || 
m.metricName().group().equals(STATE_STORE_LEVEL_GROUP))
   ).collect(Collectors.toList());
   
   ```
   
   The reason for the difference between the KV store and the window store is 
that they are used in different tests with different number of state stores.
   
   The test that uses the KV stores tests three different types of KV stores, 
namely in-memory, rocksdb, and in-memory-lru-cache. For each of this types the 
old group name changes. That is also the reason we need to pass the parameter 
`group0100To24` to `checkKeyValueStoreMetrics()`.
   
   In `checkWindowStoreAndSuppressionBufferMetrics()` we need to filter for 
four groups, because the corresponding test uses suppression and window state 
store. Suppression buffers had their own group in the old version. In the new 
version they moved into the state store group. Those groups are 
`BUFFER_LEVEL_GROUP_0100_TO_24` and `STATE_STORE_LEVEL_GROUP`. The window state 
store had their own group in the old version, i.e., 
`STATE_STORE_LEVEL_GROUP_ROCKSDB_WINDOW_STORE_0100_TO_24` (we are only using 
RocksDB-based window stores in the test). Finally, during the implementation of 
KIP-444, we discovered that we named a group incorrectly. That's why we filter 
also for group `stream-rocksdb-window-metrics`.
   
   So to sum up, it is hard to compare the verifications for KV stores and 
window stores, because they are used in different tests. Sorry, I should have 
been clearer on that before. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10334) Transactions not working properly

2020-07-31 Thread Luis Araujo (Jira)
Luis Araujo created KAFKA-10334:
---

 Summary: Transactions not working properly
 Key: KAFKA-10334
 URL: https://issues.apache.org/jira/browse/KAFKA-10334
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.3.0, 2.1.0
Reporter: Luis Araujo


I'm using transactions provided by Kafka Producer API in a Scala project built 
with SBT. The dependency used in the project is: 

"org.apache.kafka" % "kafka-clients" % "2.1.0"

I followed the documentation and I was expecting that transactions fail when I 
call .commitTransaction if some problem is raised when sending a message like 
it's described in the documentation: 
[https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
]

Unfortunatelly, when testing this behaviour using a message larger than the 
size accepted by the Kafka broker/cluster, the transactions are not working 
properly.

I tested with a 3 Kafka broker cluster with 1MB message max size (default 
value):
- when the message has 1MB, the transaction is aborted and an exception is 
raised when calling commitTransaction()
- when the message is bigger than 1MB, the transaction is completed 
successfully without the message being written. no exception is thrown.

As an example, this means that when I produce 9 messages with 1 KB and 1 
message with 1.1MB in the same transaction, the transaction is completed but 
only 9 messages are written to the Kafka cluster.

I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
cluster and Kafka Producer API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-07-31 Thread GitBox


cadonna commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463578816



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
##
@@ -227,7 +228,8 @@ public MockProcessorContext(final Properties config, final 
TaskId taskId, final
 this.metrics = new StreamsMetricsImpl(
 new Metrics(metricConfig),
 threadId,
-
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG)
+
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
+Time.SYSTEM

Review comment:
   I would prefer to postpone that, because currently it is not strictly 
needed and the time is only used in the RocksDB recording trigger that records 
only internal RocksDB metrics. I do not see how exposing time would be useful 
for users during testing. If anybody complains, we can still do it in a future 
KIP.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10335) Blocking of producer IO thread when calling send() from callback

2020-07-31 Thread Alexander Sibiryakov (Jira)
Alexander Sibiryakov created KAFKA-10335:


 Summary: Blocking of producer IO thread when calling send() from 
callback
 Key: KAFKA-10335
 URL: https://issues.apache.org/jira/browse/KAFKA-10335
 Project: Kafka
  Issue Type: Bug
  Components: clients, producer 
Reporter: Alexander Sibiryakov


We had application which supposed to be using KafkaProducer to deliver results 
of some work. Sometimes delivery of results weren't successful because of 
network connectivity errors or maintenance happening on the broker side. In 
such cases we wanted application to send an event with error and original 
message details. All good, but we wanted errors to be delivered to a separate 
topic. So we implemented a callback in send() method, using the same producer 
instance and calling send() from there.

This application worked for some time, but then we encountered that its 
producer was stuck. Almost no CPU consumption and expiring batches for hours. 
After connecting with debugger it turned out that sender IO thread is blocking. 
When record is expired, a callback was called, which contained a call to 
send(), implying usage of a new topic, which metadata is not present in 
producer's client cache. When send() is missing metadata, it is allowed to 
block for up to max.block.ms interval, which is 60 secs by default. If 
application is active, then it will quickly result in a large amount of 
accumulated records. Every record will block IO thread for 60s. Therefore 
sender IO thread is essentially blocked. In Producer only Sender IO thread 
contains a call to client's poll() method, which is responsible for all the 
network communication and metadata update. If poll() is executed with 
significant delay then it will result to errors, connected with various 
timeouts. That's it we've got a stuck producer with little chance to recover.

To summarise, pre-requisites for the problem are sending from callback, using 
the same producer instance and usage of topic which wasn't seen before.

I think it is important to decide if the issue is KafkaProducer misuse or its 
bug. Code is callbacks shouldn't block, that is clear, but at the same time, no 
one expects already initialised producer to block.

Depending on decision I could produce a fix, it can be either a warning when 
user is trying to call a send() from callback, or reduction of max allowed 
blocking time for metadata update. It could be just docs changes, or even 
nothing.

Here is code to reproduce the issue, the output it is producing follows the 
code snippet. Tested on Confluent Cloud, from my desktop with 100 Mbps 
connection.
{code:java}
public static void main(String[] args) throws IOException {
byte[] blob = new byte[262144];
Properties properties = new Properties();
properties.load(new FileReader("kafka-staging.properties"));
properties.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
properties.setProperty("request.timeout.ms", "5000");
properties.setProperty("delivery.timeout.ms", "5000");
KafkaProducer producer = new KafkaProducer(properties);
while (true) {
ProducerRecord record = new 
ProducerRecord<>("alex-test-valid-data", blob);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if (exception != null) {
System.err.println(exception);
long start = System.currentTimeMillis();
ProducerRecord record = new 
ProducerRecord<>("alex-test-errors", blob);
producer.send(record);  // blocking caused by metadata 
update
long timeElapsed = System.currentTimeMillis() - start;
System.err.println("time spent blocking IO thread: " + 
timeElapsed);
}
}
});
}
}
{code}
{noformat}
[2020-07-31 14:35:51,936: INFO/main] (AbstractConfig.java:347) - ProducerConfig 
values: 
acks = 1
batch.size = 16384
bootstrap.servers = [pkc-l915e.europe-west1.gcp.confluent.cloud:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = 
compression.type = none
connections.max.idle.ms = 54
delivery.timeout.ms = 5000
enable.idempotence = false
interceptor.classes = []
key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 6
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata

[GitHub] [kafka] cadonna commented on a change in pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-07-31 Thread GitBox


cadonna commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463617899



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) {
 throw new ProcessorStateException(fatal);
 }
 
-// Setup metrics before the database is opened, otherwise the metrics 
are not updated
+// Setup statistics before the database is opened, otherwise the 
statistics are not updated
 // with the measurements from Rocks DB
-maybeSetUpMetricsRecorder(configs);
+maybeSetUpStatistics(configs);
 
 openRocksDB(dbOptions, columnFamilyOptions);
 open = true;
+
+addValueProvidersToMetricsRecorder(configs);
 }
 
-private void maybeSetUpMetricsRecorder(final Map configs) {
-if (userSpecifiedOptions.statistics() == null &&
+private void maybeSetUpStatistics(final Map configs) {
+if (userSpecifiedOptions.statistics() != null) {
+userSpecifiedStatistics = true;
+}
+if (!userSpecifiedStatistics &&
 RecordingLevel.forName((String) 
configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
 
-isStatisticsRegistered = true;
 // metrics recorder will clean up statistics object
 final Statistics statistics = new Statistics();
 userSpecifiedOptions.setStatistics(statistics);
-metricsRecorder.addStatistics(name, statistics);
+}
+}
+
+private void addValueProvidersToMetricsRecorder(final Map 
configs) {
+final TableFormatConfig tableFormatConfig = 
userSpecifiedOptions.tableFormatConfig();
+final Statistics statistics = userSpecifiedStatistics ? null : 
userSpecifiedOptions.statistics();
+if (tableFormatConfig instanceof 
BlockBasedTableConfigWithAccessibleCache) {
+final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) 
tableFormatConfig).blockCache();
+metricsRecorder.addValueProviders(name, db, cache, statistics);
+} else {
+metricsRecorder.addValueProviders(name, db, null, statistics);
+log.warn("A table format configuration is used that does not 
expose the block cache. This means " +
+"that metrics that relate to the block cache may be wrong if 
the block cache is shared.");
 }

Review comment:
   I agree with you that it is not ideal and thank you for this lesson on 
reflection. 
   
   Indeed, I do not like reflection in this case, because it makes the code too 
much dependent on RocksDB internals. We should use reflection to check if the 
public API to configure RocksDB changed in a newer version, but that is another 
story.
   
   I do not understand how the alternative of wrapping `BlockBasedTableConfig` 
into `BlockBasedTableConfigWithAccessibleCache` should work. Since the cache is 
not accessible in `BlockBasedTableConfig` it will also not be accessible when 
it is wrapped in `BlockBasedTableConfigWithAccessibleCache` (despite the name). 
We need to get the reference to the cache when the cache is set in 
`BlockBasedTableConfig`. If the cache is already set we can only use reflection.
   
   Since the block based table format is the only format in RocksDB that uses 
the cache, I do not see why a user absolutely needs to pass a new 
`BlockBasedTableConfig` object. I think for now it is OK to log a warning, and 
clearly document that the provided `BlockBasedTableConfig` object should be 
used.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long

2020-07-31 Thread GitBox


rajinisivaram commented on pull request #9092:
URL: https://github.com/apache/kafka/pull/9092#issuecomment-667147526


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method

2020-07-31 Thread GitBox


abbccdda commented on pull request #9096:
URL: https://github.com/apache/kafka/pull/9096#issuecomment-667173698


   retest this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9109: MINOR: Add notes for 2.6 on reassignment tool changes

2020-07-31 Thread GitBox


hachikuji opened a new pull request #9109:
URL: https://github.com/apache/kafka/pull/9109


   Add some notable changes to the reassignment tool for the 2.6 release.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-31 Thread GitBox


abbccdda commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r463682515



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -4068,6 +4093,58 @@ public void testListOffsetsMetadataNonRetriableErrors() 
throws Exception {
 }
 }
 
+@Test
+public void testListOffsetsPartialResponse() throws Exception {

Review comment:
   Good coverage

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -949,13 +953,27 @@ public void onFailure(RuntimeException e) {
 leader, tp);
 partitionsToRetry.add(tp);
 } else {
-partitionDataMap.put(tp, new 
ListOffsetRequest.PartitionData(offset, leaderAndEpoch.epoch));
+int currentLeaderEpoch = 
leaderAndEpoch.epoch.orElse(ListOffsetResponse.UNKNOWN_EPOCH);
+partitionDataMap.put(tp, new ListOffsetPartition()
+.setPartitionIndex(tp.partition())
+.setTimestamp(offset)
+.setCurrentLeaderEpoch(currentLeaderEpoch));
 }
 }
 }
 return regroupPartitionMapByNode(partitionDataMap);
 }
 
+private static List 
toListOffsetTopics(Map timestampsToSearch) 
{

Review comment:
   Let's move this helper into `ListOffsetRequest`

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
##
@@ -47,96 +42,11 @@
 public static final int CONSUMER_REPLICA_ID = -1;
 public static final int DEBUGGING_REPLICA_ID = -2;
 
-// top level fields
-private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id",
-"Broker id of the follower. For normal consumers, use -1.");
-private static final Field.Int8 ISOLATION_LEVEL = new 
Field.Int8("isolation_level",
-"This setting controls the visibility of transactional records. " +
-"Using READ_UNCOMMITTED (isolation_level = 0) makes all 
records visible. With READ_COMMITTED " +
-"(isolation_level = 1), non-transactional and COMMITTED 
transactional records are visible. " +
-"To be more concrete, READ_COMMITTED returns all data from 
offsets smaller than the current " +
-"LSO (last stable offset), and enables the inclusion of 
the list of aborted transactions in the " +
-"result, which allows consumers to discard ABORTED 
transactional records");
-private static final Field.ComplexArray TOPICS = new 
Field.ComplexArray("topics",
-"Topics to list offsets.");
-
-// topic level fields
-private static final Field.ComplexArray PARTITIONS = new 
Field.ComplexArray("partitions",
-"Partitions to list offsets.");
-
-// partition level fields
-private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp",
-"The target timestamp for the partition.");
-private static final Field.Int32 MAX_NUM_OFFSETS = new 
Field.Int32("max_num_offsets",
-"Maximum offsets to return.");
-
-private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-PARTITION_ID,
-TIMESTAMP,
-MAX_NUM_OFFSETS);
-
-private static final Field TOPICS_V0 = TOPICS.withFields(
-TOPIC_NAME,
-PARTITIONS_V0);
-
-private static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(
-REPLICA_ID,
-TOPICS_V0);
-
-// V1 removes max_num_offsets
-private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
-PARTITION_ID,
-TIMESTAMP);
-
-private static final Field TOPICS_V1 = TOPICS.withFields(
-TOPIC_NAME,
-PARTITIONS_V1);
-
-private static final Schema LIST_OFFSET_REQUEST_V1 = new Schema(
-REPLICA_ID,
-TOPICS_V1);
-
-// V2 adds a field for the isolation level
-private static final Schema LIST_OFFSET_REQUEST_V2 = new Schema(
-REPLICA_ID,
-ISOLATION_LEVEL,
-TOPICS_V1);
-
-// V3 bump used to indicate that on quota violation brokers send out 
responses before throttling.
-private static final Schema LIST_OFFSET_REQUEST_V3 = 
LIST_OFFSET_REQUEST_V2;
-
-// V4 introduces the current leader epoch, which is used for fencing
-private static final Field PARTITIONS_V4 = PARTITIONS.withFields(
-PARTITION_ID,
-CURRENT_LEADER_EPOCH,
-TIMESTAMP);
-
-private static final Field TOPICS_V4 = TOPICS.withFields(
-TOPIC_NAME,
-PARTITIONS_V4);
-
-private static final Schema LIST_OFFSET_REQUEST_V4 = new Schema(
-REPLICA_ID,
-ISOLATION_LEVEL,
-TOPICS_V4);
-
-// V5 b

[jira] [Created] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions

2020-07-31 Thread John Roesler (Jira)
John Roesler created KAFKA-10336:


 Summary: Rolling upgrade with Suppression AND Standbys may throw 
exceptions
 Key: KAFKA-10336
 URL: https://issues.apache.org/jira/browse/KAFKA-10336
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: John Roesler


Tl;dr:

If you have standbys AND use Suppress with changelogging enabled, you may 
experience exceptions leading to threads shutting down on the OLD instances 
during a rolling upgrade. No corruption is expected, and when the rolling 
upgrade completes, all threads will be running and processing correctly.

Details:

The Suppression changelog has had to change its internal data format several 
times to fix bugs. The binary schema of the changelog values is determined by a 
version header on the records, and new versions are able to decode all old 
versions' formats.

The suppression changelog decoder is also configured to throw an exception if 
it encounters a version number that it doesn't recognize, causing the thread to 
stop processing and shut down.

When standbys are configured, there is one so-called "active" worker writing 
into the suppression buffer and sending the same messages into the changelog, 
while another "standby" worker reads those messages, decodes them, and 
maintains a hot-standby replica of the suppression buffer.

If the standby worker is running and older version of Streams than the active 
worker, what can happen today is that the active worker may write changelog 
messages with a higher version number than the standby worker can understand. 
When the standby worker receives one of these messages, it will throw the 
exception and shut down its thread.

Note, although the exceptions are undesired, at least this behavior protects 
the integrity of the application and prevents data corruption or loss.

Workaround:

Several workarounds are possible:

This only affects clusters that do all of (A) rolling bounce, (B) suppression, 
(C) standby replicas, (D) changelogged suppression buffers. Changing any of 
those four variables will prevent the issue from occurring. I would NOT 
recommend disabling (D), and (B) is probably off the table, since the 
application logic presumably depends on it. Therefore, your practical choices 
are to disable standbys (C), or to do a full-cluster bounce (A). Personally, I 
think (A) is the best option.

Also note, although the exceptions and threads shutting down are not ideal, 
they would only afflict the old-versioned nodes. I.e., the nodes you intend to 
replace anyway. So another "workaround" is simply to ignore the exceptions and 
proceed with the rolling bounce. As the old-versioned nodes are replaced with 
new-versioned nodes, the new nodes will again be able to decode their peers' 
changelog messages and be able to maintain the hot-standby replicas of the 
suppression buffers.

Detection:

Although I really should have anticipated this condition, I first detected it 
while expanding our system test coverage as part of KAFKA-10173. I added a 
rolling upgrade test with an application that uses both suppression and standby 
replicas, and observed that the rolling upgrades would occasionally cause the 
old nodes to crash. Accordingly, in KAFKA-10173, I disabled the rolling-upgrade 
configuration and only do full-cluster upgrades. Resolving _this_ ticket will 
allow us to re-enable rolling upgrades.

Proposed solution:

Part 1:

Since Streams can decode both current and past versions, but not future 
versions, we need to implement a mechanism to prevent new-versioned nodes from 
writing new-versioned messages, which would appear as future-versioned messages 
to the old-versioned nodes.

We have an UPGRADE_FROM configuration that we could leverage to accomplish 
this. In that case, when upgrading from 2.3 to 2.4, you would set UPGRADE_FROM 
to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) nodes would 
continue writing messages in the old (2.3) format. Thus, the still-running old 
nodes will still be able to read them.

Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. 
Post-bounce, the nodes would start writing in the 2.4 format, which is ok 
because all the members are running 2.4 at this point and can decode these 
messages, even if they are still configured to write with version 2.3.

After the second rolling bounce, the whole cluster is both running 2.4 and 
writing with the 2.4 format.

Part 2:

Managing two rolling bounces can be difficult, so it is also desirable to 
implement a mechanism for automatically negotiating the schema version 
internally.

In fact, this is already present in Streams, and it is called "version 
probing". Right now, version probing is used to enable the exact same kind of 
transition from an old-message-format to a new-message-format when both old and 
new members are in the cluster, but it is only 

[jira] [Commented] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)

2020-07-31 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168977#comment-17168977
 ] 

John Roesler commented on KAFKA-10322:
--

Hi all,

Thanks for your insights [~ableegoldman] !

I realized after reading your comment that I'd never written down the full 
extent of the idea you referenced. I've just documented it here: 
https://issues.apache.org/jira/browse/KAFKA-10336

It seems like we could use the same general mechanism to provide an upgrade 
path to a version of Streams that solves this issue.

Thanks,

-John

> InMemoryWindowStore restore keys format incompatibility (lack of 
> sequenceNumber in keys on topic)
> -
>
> Key: KAFKA-10322
> URL: https://issues.apache.org/jira/browse/KAFKA-10322
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
> Environment: windows/linux
>Reporter: Tomasz Bradło
>Priority: Major
>
> I have regular groupBy&Counting stream configuration:
> {code:java}
>
> fun addStream(kStreamBuilder: StreamsBuilder) {
> val storeSupplier = Stores.inMemoryWindowStore("count-store",
> Duration.ofDays(10),
> Duration.ofDays(1),
> false)
> val storeBuilder: StoreBuilder> = 
> Stores
> .windowStoreBuilder(storeSupplier, 
> JsonSerde(CountableEvent::class.java), Serdes.Long())
> kStreamBuilder
> .stream("input-topic", Consumed.with(Serdes.String(), 
> Serdes.String()))
> .map {_, jsonRepresentation -> 
> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
> .groupByKey()
> .windowedBy(TimeWindows.of(Duration.ofDays(1)))
> 
> .count(Materialized.with(JsonSerde(CountableEvent::class.java), 
> Serdes.Long()))
> .toStream()
> .to("topic1-count")
> val storeConsumed = 
> Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java),
>  Duration.ofDays(1).toMillis()), Serdes.Long())
> kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", 
> storeConsumed, passThroughProcessorSupplier)
> }{code}
> While sending to "topic1-count", for serializing the key 
> [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java]
>  is used which is using 
> [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112]
>  so the message key format is:
> {code:java}
> real_grouping_key + timestamp(8bytes){code}
>  
> Everything works. I can get correct values from state-store. But, in recovery 
> scenario, when [GlobalStateManagerImpl 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters
>  offset < highWatermark loop then
> [InMemoryWindowStore stateRestoreCallback 
> |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads
>  from "topic1-count" and fails to extract valid key and timestamp using 
> [WindowKeySchema.extractStoreKeyBytes 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and
>  [WindowKeySchema.extractStoreTimestamp. 
> |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It
>  fails because it expects format:
> {code:java}
> real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code}
> How this is supposed to work in this case?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions

2020-07-31 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10336:
-
Priority: Blocker  (was: Major)

> Rolling upgrade with Suppression AND Standbys may throw exceptions
> --
>
> Key: KAFKA-10336
> URL: https://issues.apache.org/jira/browse/KAFKA-10336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: John Roesler
>Priority: Blocker
>
> Tl;dr:
> If you have standbys AND use Suppress with changelogging enabled, you may 
> experience exceptions leading to threads shutting down on the OLD instances 
> during a rolling upgrade. No corruption is expected, and when the rolling 
> upgrade completes, all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several 
> times to fix bugs. The binary schema of the changelog values is determined by 
> a version header on the records, and new versions are able to decode all old 
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if 
> it encounters a version number that it doesn't recognize, causing the thread 
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing 
> into the suppression buffer and sending the same messages into the changelog, 
> while another "standby" worker reads those messages, decodes them, and 
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active 
> worker, what can happen today is that the active worker may write changelog 
> messages with a higher version number than the standby worker can understand. 
> When the standby worker receives one of these messages, it will throw the 
> exception and shut down its thread.
> Note, although the exceptions are undesired, at least this behavior protects 
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B) 
> suppression, (C) standby replicas, (D) changelogged suppression buffers. 
> Changing any of those four variables will prevent the issue from occurring. I 
> would NOT recommend disabling (D), and (B) is probably off the table, since 
> the application logic presumably depends on it. Therefore, your practical 
> choices are to disable standbys (C), or to do a full-cluster bounce (A). 
> Personally, I think (A) is the best option.
> Also note, although the exceptions and threads shutting down are not ideal, 
> they would only afflict the old-versioned nodes. I.e., the nodes you intend 
> to replace anyway. So another "workaround" is simply to ignore the exceptions 
> and proceed with the rolling bounce. As the old-versioned nodes are replaced 
> with new-versioned nodes, the new nodes will again be able to decode their 
> peers' changelog messages and be able to maintain the hot-standby replicas of 
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it 
> while expanding our system test coverage as part of KAFKA-10173. I added a 
> rolling upgrade test with an application that uses both suppression and 
> standby replicas, and observed that the rolling upgrades would occasionally 
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the 
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving 
> _this_ ticket will allow us to re-enable rolling upgrades.
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future 
> versions, we need to implement a mechanism to prevent new-versioned nodes 
> from writing new-versioned messages, which would appear as future-versioned 
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish 
> this. In that case, when upgrading from 2.3 to 2.4, you would set 
> UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) 
> nodes would continue writing messages in the old (2.3) format. Thus, the 
> still-running old nodes will still be able to read them.
> Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. 
> Post-bounce, the nodes would start writing in the 2.4 format, which is ok 
> because all the members are running 2.4 at this point and can decode these 
> messages, even if they are still configured to write with version 2.3.
> After the second rolling bounce, the whole cluster is both running 2.4 and 
> writing with the 2.4 format.
> Part 2:
> Managing two rolling bounces can be difficult, so it is also desirable to 
>

[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions

2020-07-31 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10336:
-
Affects Version/s: 2.6.0
   2.3.0
   2.4.0
   2.5.0

> Rolling upgrade with Suppression AND Standbys may throw exceptions
> --
>
> Key: KAFKA-10336
> URL: https://issues.apache.org/jira/browse/KAFKA-10336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Tl;dr:
> If you have standbys AND use Suppress with changelogging enabled, you may 
> experience exceptions leading to threads shutting down on the OLD instances 
> during a rolling upgrade. No corruption is expected, and when the rolling 
> upgrade completes, all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several 
> times to fix bugs. The binary schema of the changelog values is determined by 
> a version header on the records, and new versions are able to decode all old 
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if 
> it encounters a version number that it doesn't recognize, causing the thread 
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing 
> into the suppression buffer and sending the same messages into the changelog, 
> while another "standby" worker reads those messages, decodes them, and 
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active 
> worker, what can happen today is that the active worker may write changelog 
> messages with a higher version number than the standby worker can understand. 
> When the standby worker receives one of these messages, it will throw the 
> exception and shut down its thread.
> Note, although the exceptions are undesired, at least this behavior protects 
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B) 
> suppression, (C) standby replicas, (D) changelogged suppression buffers. 
> Changing any of those four variables will prevent the issue from occurring. I 
> would NOT recommend disabling (D), and (B) is probably off the table, since 
> the application logic presumably depends on it. Therefore, your practical 
> choices are to disable standbys (C), or to do a full-cluster bounce (A). 
> Personally, I think (A) is the best option.
> Also note, although the exceptions and threads shutting down are not ideal, 
> they would only afflict the old-versioned nodes. I.e., the nodes you intend 
> to replace anyway. So another "workaround" is simply to ignore the exceptions 
> and proceed with the rolling bounce. As the old-versioned nodes are replaced 
> with new-versioned nodes, the new nodes will again be able to decode their 
> peers' changelog messages and be able to maintain the hot-standby replicas of 
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it 
> while expanding our system test coverage as part of KAFKA-10173. I added a 
> rolling upgrade test with an application that uses both suppression and 
> standby replicas, and observed that the rolling upgrades would occasionally 
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the 
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving 
> _this_ ticket will allow us to re-enable rolling upgrades.
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future 
> versions, we need to implement a mechanism to prevent new-versioned nodes 
> from writing new-versioned messages, which would appear as future-versioned 
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish 
> this. In that case, when upgrading from 2.3 to 2.4, you would set 
> UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) 
> nodes would continue writing messages in the old (2.3) format. Thus, the 
> still-running old nodes will still be able to read them.
> Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. 
> Post-bounce, the nodes would start writing in the 2.4 format, which is ok 
> because all the members are running 2.4 at this point and can decode these 
> messages, even if they are still configured to write with version 2.3.
> After the second rolling bounce, t

[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions

2020-07-31 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10336:
-
Fix Version/s: 2.7.0

> Rolling upgrade with Suppression AND Standbys may throw exceptions
> --
>
> Key: KAFKA-10336
> URL: https://issues.apache.org/jira/browse/KAFKA-10336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Tl;dr:
> If you have standbys AND use Suppress with changelogging enabled, you may 
> experience exceptions leading to threads shutting down on the OLD instances 
> during a rolling upgrade. No corruption is expected, and when the rolling 
> upgrade completes, all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several 
> times to fix bugs. The binary schema of the changelog values is determined by 
> a version header on the records, and new versions are able to decode all old 
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if 
> it encounters a version number that it doesn't recognize, causing the thread 
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing 
> into the suppression buffer and sending the same messages into the changelog, 
> while another "standby" worker reads those messages, decodes them, and 
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active 
> worker, what can happen today is that the active worker may write changelog 
> messages with a higher version number than the standby worker can understand. 
> When the standby worker receives one of these messages, it will throw the 
> exception and shut down its thread.
> Note, although the exceptions are undesired, at least this behavior protects 
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B) 
> suppression, (C) standby replicas, (D) changelogged suppression buffers. 
> Changing any of those four variables will prevent the issue from occurring. I 
> would NOT recommend disabling (D), and (B) is probably off the table, since 
> the application logic presumably depends on it. Therefore, your practical 
> choices are to disable standbys (C), or to do a full-cluster bounce (A). 
> Personally, I think (A) is the best option.
> Also note, although the exceptions and threads shutting down are not ideal, 
> they would only afflict the old-versioned nodes. I.e., the nodes you intend 
> to replace anyway. So another "workaround" is simply to ignore the exceptions 
> and proceed with the rolling bounce. As the old-versioned nodes are replaced 
> with new-versioned nodes, the new nodes will again be able to decode their 
> peers' changelog messages and be able to maintain the hot-standby replicas of 
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it 
> while expanding our system test coverage as part of KAFKA-10173. I added a 
> rolling upgrade test with an application that uses both suppression and 
> standby replicas, and observed that the rolling upgrades would occasionally 
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the 
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving 
> _this_ ticket will allow us to re-enable rolling upgrades.
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future 
> versions, we need to implement a mechanism to prevent new-versioned nodes 
> from writing new-versioned messages, which would appear as future-versioned 
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish 
> this. In that case, when upgrading from 2.3 to 2.4, you would set 
> UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) 
> nodes would continue writing messages in the old (2.3) format. Thus, the 
> still-running old nodes will still be able to read them.
> Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. 
> Post-bounce, the nodes would start writing in the 2.4 format, which is ok 
> because all the members are running 2.4 at this point and can decode these 
> messages, even if they are still configured to write with version 2.3.
> After the second rolling bounce, the whole cluster is both running 2.4 and 
> writing with the 2.4 format.
> Part 2:
> Managing two rolling bounces can be difficult, so it is 

[jira] [Commented] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions

2020-07-31 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168979#comment-17168979
 ] 

John Roesler commented on KAFKA-10336:
--

I've upgraded this to a blocker for 2.7.0, so that we won't forget about it. It 
was a regression in 2.3.0 when we changed the suppression buffer format the 
first time, but we didn't detect it because of a testing gap.

However, it's a pretty serious issue, and will only become more impactful as 
more people use suppression and as we make other internal topic format changes, 
for example in the fix for https://issues.apache.org/jira/browse/KAFKA-10322

> Rolling upgrade with Suppression AND Standbys may throw exceptions
> --
>
> Key: KAFKA-10336
> URL: https://issues.apache.org/jira/browse/KAFKA-10336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Tl;dr:
> If you have standbys AND use Suppress with changelogging enabled, you may 
> experience exceptions leading to threads shutting down on the OLD instances 
> during a rolling upgrade. No corruption is expected, and when the rolling 
> upgrade completes, all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several 
> times to fix bugs. The binary schema of the changelog values is determined by 
> a version header on the records, and new versions are able to decode all old 
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if 
> it encounters a version number that it doesn't recognize, causing the thread 
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing 
> into the suppression buffer and sending the same messages into the changelog, 
> while another "standby" worker reads those messages, decodes them, and 
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active 
> worker, what can happen today is that the active worker may write changelog 
> messages with a higher version number than the standby worker can understand. 
> When the standby worker receives one of these messages, it will throw the 
> exception and shut down its thread.
> Note, although the exceptions are undesired, at least this behavior protects 
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B) 
> suppression, (C) standby replicas, (D) changelogged suppression buffers. 
> Changing any of those four variables will prevent the issue from occurring. I 
> would NOT recommend disabling (D), and (B) is probably off the table, since 
> the application logic presumably depends on it. Therefore, your practical 
> choices are to disable standbys (C), or to do a full-cluster bounce (A). 
> Personally, I think (A) is the best option.
> Also note, although the exceptions and threads shutting down are not ideal, 
> they would only afflict the old-versioned nodes. I.e., the nodes you intend 
> to replace anyway. So another "workaround" is simply to ignore the exceptions 
> and proceed with the rolling bounce. As the old-versioned nodes are replaced 
> with new-versioned nodes, the new nodes will again be able to decode their 
> peers' changelog messages and be able to maintain the hot-standby replicas of 
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it 
> while expanding our system test coverage as part of KAFKA-10173. I added a 
> rolling upgrade test with an application that uses both suppression and 
> standby replicas, and observed that the rolling upgrades would occasionally 
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the 
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving 
> _this_ ticket will allow us to re-enable rolling upgrades.
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future 
> versions, we need to implement a mechanism to prevent new-versioned nodes 
> from writing new-versioned messages, which would appear as future-versioned 
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish 
> this. In that case, when upgrading from 2.3 to 2.4, you would set 
> UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) 
> nodes would continue writing messages in the old (2.3) format. Thus, the 
> sti

[GitHub] [kafka] rhauch commented on a change in pull request #9109: MINOR: Add notes for 2.6 on reassignment tool changes

2020-07-31 Thread GitBox


rhauch commented on a change in pull request #9109:
URL: https://github.com/apache/kafka/pull/9109#discussion_r463705431



##
File path: docs/upgrade.html
##
@@ -49,6 +49,14 @@ Notable changes in 2
 Fetch requests and other requests intended only for the leader or 
follower return NOT_LEADER_OR_FOLLOWER(6) instead of REPLICA_NOT_AVAILABLE(9)
 if the broker is not a replica, ensuring that this transient error 
during reassignments is handled by all clients as a retriable exception.
 
+There are several notable changes to the reassignment tool 
kafka-reassign-partitions.sh
+following the completion of
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment";>KIP-455.
+This tool now requires the --additional flag to be 
providing when changing the throttle of an

Review comment:
   Nit: fix grammar:
   
   ```suggestion
   This tool now requires the --additional flag to be 
provided when changing the throttle of an
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-31 Thread GitBox


vvcephei commented on pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#issuecomment-667212439


   Sounds good, @guozhangwang . Another option is to make an internal config so 
that we can parameterize the benchmarks and get a more thorough understanding 
of the impact of this change.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9078: KAFKA-10132: Return correct value types for MBean attributes

2020-07-31 Thread GitBox


abbccdda commented on a change in pull request #9078:
URL: https://github.com/apache/kafka/pull/9078#discussion_r463708722



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##
@@ -272,8 +272,16 @@ public MBeanInfo getMBeanInfo() {
 for (Map.Entry entry : 
this.metrics.entrySet()) {
 String attribute = entry.getKey();
 KafkaMetric metric = entry.getValue();
+String metricType = double.class.getName();
+
+try {
+metricType = metric.metricValue().getClass().getName();
+} catch (NullPointerException e) {

Review comment:
   It is weird to catch NPE, could we just check whether `metricValue` is 
defined?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei edited a comment on pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-31 Thread GitBox


vvcephei edited a comment on pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#issuecomment-667212439


   Sounds good, @guozhangwang . Another option is to make an internal config so 
that we can parameterize the benchmarks and get a more thorough understanding 
of the impact of this change.
   
   Of course, we can easily transition from the hard-coded value to an internal 
config in a small follow-on PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9105: MINOR: closable object Memory leak prevention

2020-07-31 Thread GitBox


abbccdda commented on a change in pull request #9105:
URL: https://github.com/apache/kafka/pull/9105#discussion_r463708981



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -197,8 +197,8 @@ synchronized boolean lock(final TaskId taskId) throws 
IOException {
 
 final FileChannel channel;
 
-try {
-channel = getOrCreateFileChannel(taskId, lockFile.toPath());
+try (final FileChannel fileChannel = getOrCreateFileChannel(taskId, 
lockFile.toPath())) {

Review comment:
   It looks weird to use the resource outside of try block, does this 
really work?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-31 Thread GitBox


vvcephei merged pull request #8993:
URL: https://github.com/apache/kafka/pull/8993


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-31 Thread GitBox


vvcephei commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-667214395


   Thanks @guozhangwang ! I've merged to 2.5. If we do an 2.5.1RC1, it'll be 
included. Otherwise, it'll go into 2.5.2. Either way, it will run as part of 
nightly branch tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-31 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-667233872


   @chia7712 : So, it's just a rebase and there is no change in your PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9109: MINOR: Add notes for 2.6 on reassignment tool changes

2020-07-31 Thread GitBox


hachikuji merged pull request #9109:
URL: https://github.com/apache/kafka/pull/9109


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…

2020-07-31 Thread GitBox


abbccdda commented on pull request #9108:
URL: https://github.com/apache/kafka/pull/9108#issuecomment-667241038


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-31 Thread GitBox


ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r463738138



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -368,7 +313,96 @@ public void handleAssignment(final Map> activeTasks,
 addNewTask(task);
 }
 }
+}
+
+private void handleCloseAndRecycle(final List tasksToRecycle,
+   final List tasksToCloseClean,
+   final List tasksToCloseDirty,
+   final Map> 
activeTasksToCreate,
+   final Map> 
standbyTasksToCreate,
+   final LinkedHashMap taskCloseExceptions) {
+if (!tasksToCloseDirty.isEmpty()) {
+throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+}
+
+// for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+final List tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+tasksToCheckpoint.addAll(tasksToRecycle);
+for (final Task task : tasksToCheckpoint) {
+try {
+// Always try to first suspend and commit the task before 
checkpointing it;
+// some tasks may already be suspended which should be a no-op.
+//
+// Also since active tasks should already be suspended / 
committed and
+// standby tasks should have no offsets to commit, we should 
expect nothing to commit
+task.suspend();
+
+// Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+// 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+//and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+// 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+//write the checkpoint file.
+final Map offsets = 
task.prepareCommit();

Review comment:
   Alternatively, now that we enforce checkpoint during suspension, we 
could just remove the `pre/postCommit` for active tasks in `handleAssignment`. 
It just seems nice to be able to assert that we never call `pre/postCommit` 
after a task is suspended





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions

2020-07-31 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169038#comment-17169038
 ] 

Sophie Blee-Goldman commented on KAFKA-10336:
-

Thanks for the detailed bug report. Quick question – are you sure this is only 
possible when standbys are enabled? It seems like you could end up in a 
situation where a suppression task ends up on an upgraded instance, which then 
completes restoration and starts writing to the changelog with the newer 
protocol. Then during a subsequent rebalance of the rolling upgrade, this task 
gets migrated back to an old instance, tries to read the new protocol version, 
and dies.

There's some consolation here: this case is probably pretty rare, at least 
relative to the case with standbys enabled

> Rolling upgrade with Suppression AND Standbys may throw exceptions
> --
>
> Key: KAFKA-10336
> URL: https://issues.apache.org/jira/browse/KAFKA-10336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 2.7.0
>
>
> Tl;dr:
> If you have standbys AND use Suppress with changelogging enabled, you may 
> experience exceptions leading to threads shutting down on the OLD instances 
> during a rolling upgrade. No corruption is expected, and when the rolling 
> upgrade completes, all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several 
> times to fix bugs. The binary schema of the changelog values is determined by 
> a version header on the records, and new versions are able to decode all old 
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if 
> it encounters a version number that it doesn't recognize, causing the thread 
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing 
> into the suppression buffer and sending the same messages into the changelog, 
> while another "standby" worker reads those messages, decodes them, and 
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active 
> worker, what can happen today is that the active worker may write changelog 
> messages with a higher version number than the standby worker can understand. 
> When the standby worker receives one of these messages, it will throw the 
> exception and shut down its thread.
> Note, although the exceptions are undesired, at least this behavior protects 
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B) 
> suppression, (C) standby replicas, (D) changelogged suppression buffers. 
> Changing any of those four variables will prevent the issue from occurring. I 
> would NOT recommend disabling (D), and (B) is probably off the table, since 
> the application logic presumably depends on it. Therefore, your practical 
> choices are to disable standbys (C), or to do a full-cluster bounce (A). 
> Personally, I think (A) is the best option.
> Also note, although the exceptions and threads shutting down are not ideal, 
> they would only afflict the old-versioned nodes. I.e., the nodes you intend 
> to replace anyway. So another "workaround" is simply to ignore the exceptions 
> and proceed with the rolling bounce. As the old-versioned nodes are replaced 
> with new-versioned nodes, the new nodes will again be able to decode their 
> peers' changelog messages and be able to maintain the hot-standby replicas of 
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it 
> while expanding our system test coverage as part of KAFKA-10173. I added a 
> rolling upgrade test with an application that uses both suppression and 
> standby replicas, and observed that the rolling upgrades would occasionally 
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the 
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving 
> _this_ ticket will allow us to re-enable rolling upgrades.
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future 
> versions, we need to implement a mechanism to prevent new-versioned nodes 
> from writing new-versioned messages, which would appear as future-versioned 
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish 
> this. In that case, when upgrading from 2.3 to 2.4, you would set 
> UPG

[jira] [Commented] (KAFKA-10137) Clean-up retain Duplicate logic in Window Stores

2020-07-31 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169045#comment-17169045
 ] 

Sophie Blee-Goldman commented on KAFKA-10137:
-

Yeah I thought that might be the case, although I'm not sure I agree with the 
reasoning behind it. You shouldn't be switching `retainDuplicates` on and off 
for an existing store; the changelog bytes should match the local store format. 

Maybe the theory was that users might try to build two stores off of the same 
changelog, one with duplicates and one without. I don't think we should support 
that either.

On the other hand it's reasonable to write the output of a windowed aggregation 
to a topic and then use that as the source topic for a table/store/global store 
with or without duplicates. Unfortunately that is exactly [the 
case|https://issues.apache.org/jira/browse/KAFKA-10322] which is broken by this 
"bug" (whether intentional or not)

> Clean-up retain Duplicate logic in Window Stores
> 
>
> Key: KAFKA-10137
> URL: https://issues.apache.org/jira/browse/KAFKA-10137
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Priority: Minor
>
> Stream-stream joins use the regular `WindowStore` implementation but with 
> `retainDuplicates` set to true. To allow for duplicates while using the same 
> unique-key underlying stores we just wrap the key with an incrementing 
> sequence number before inserting it.
> The logic to maintain and append the sequence number is present in multiple 
> locations, namely in the changelogging window store and in its underlying 
> window stores. We should consolidate this code to one single location.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-31 Thread GitBox


ableegoldman commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r463750941



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   I see, thanks for the explanation.  The suggestion worked





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions

2020-07-31 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10336:
--
Labels: bug user-experience  (was: )

> Rolling upgrade with Suppression AND Standbys may throw exceptions
> --
>
> Key: KAFKA-10336
> URL: https://issues.apache.org/jira/browse/KAFKA-10336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: John Roesler
>Priority: Blocker
>  Labels: bug, user-experience
> Fix For: 2.7.0
>
>
> Tl;dr:
> If you have standbys AND use Suppress with changelogging enabled, you may 
> experience exceptions leading to threads shutting down on the OLD instances 
> during a rolling upgrade. No corruption is expected, and when the rolling 
> upgrade completes, all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several 
> times to fix bugs. The binary schema of the changelog values is determined by 
> a version header on the records, and new versions are able to decode all old 
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if 
> it encounters a version number that it doesn't recognize, causing the thread 
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing 
> into the suppression buffer and sending the same messages into the changelog, 
> while another "standby" worker reads those messages, decodes them, and 
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active 
> worker, what can happen today is that the active worker may write changelog 
> messages with a higher version number than the standby worker can understand. 
> When the standby worker receives one of these messages, it will throw the 
> exception and shut down its thread.
> Note, although the exceptions are undesired, at least this behavior protects 
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B) 
> suppression, (C) standby replicas, (D) changelogged suppression buffers. 
> Changing any of those four variables will prevent the issue from occurring. I 
> would NOT recommend disabling (D), and (B) is probably off the table, since 
> the application logic presumably depends on it. Therefore, your practical 
> choices are to disable standbys (C), or to do a full-cluster bounce (A). 
> Personally, I think (A) is the best option.
> Also note, although the exceptions and threads shutting down are not ideal, 
> they would only afflict the old-versioned nodes. I.e., the nodes you intend 
> to replace anyway. So another "workaround" is simply to ignore the exceptions 
> and proceed with the rolling bounce. As the old-versioned nodes are replaced 
> with new-versioned nodes, the new nodes will again be able to decode their 
> peers' changelog messages and be able to maintain the hot-standby replicas of 
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it 
> while expanding our system test coverage as part of KAFKA-10173. I added a 
> rolling upgrade test with an application that uses both suppression and 
> standby replicas, and observed that the rolling upgrades would occasionally 
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the 
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving 
> _this_ ticket will allow us to re-enable rolling upgrades.
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future 
> versions, we need to implement a mechanism to prevent new-versioned nodes 
> from writing new-versioned messages, which would appear as future-versioned 
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish 
> this. In that case, when upgrading from 2.3 to 2.4, you would set 
> UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) 
> nodes would continue writing messages in the old (2.3) format. Thus, the 
> still-running old nodes will still be able to read them.
> Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. 
> Post-bounce, the nodes would start writing in the 2.4 format, which is ok 
> because all the members are running 2.4 at this point and can decode these 
> messages, even if they are still configured to write with version 2.3.
> After the second rolling bounce, the whole cluster is both runn

[GitHub] [kafka] ableegoldman commented on a change in pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method

2020-07-31 Thread GitBox


ableegoldman commented on a change in pull request #9096:
URL: https://github.com/apache/kafka/pull/9096#discussion_r463752139



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -144,15 +144,35 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 return true;
 }
 
+
+/**
+ * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
+ * The method includes the following steps:
+ *
+ * 1. Reassign as many previously owned partitions as possible

Review comment:
   ```suggestion
* 1. Reassign as many previously owned partitions as possible, up to 
the maxQuota
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10186) Aborting transaction with pending data should throw non-fatal exception

2020-07-31 Thread John Thomas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169052#comment-17169052
 ] 

John Thomas commented on KAFKA-10186:
-

[~ableegoldman] If we abort a transaction with any non-flushed data, we want to 
throw a different exception, since we know its non-fatal ? 

{color:#172b4d}If my understanding is correct, In 
Sender#maybeSendAndPollTransactionalRequest : 
transactionManaer.hasAbortableError() -> This is fatal, {color}

{color:#172b4d}transactionManager.isAborting() - > This is something we know 
that its aborted, and is recoverable. --
{color}

 
{code:java}
if (transactionManager.hasAbortableError() || transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
 RuntimeException exception = transactionManager.lastError();
 if (exception == null) {
exception = new KafkaException("Failing batch since transaction was 
aborted");
 }
 accumulator.abortUndrainedBatches(exception);
  }
}{code}
PS : #newbie !

 

> Aborting transaction with pending data should throw non-fatal exception
> ---
>
> Key: KAFKA-10186
> URL: https://issues.apache.org/jira/browse/KAFKA-10186
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> Currently if you try to abort a transaction with any pending (non-flushed) 
> data, the send exception is set to
> {code:java}
>  KafkaException("Failing batch since transaction was aborted"){code}
> This exception type is generally considered fatal, but this is a valid state 
> to be in -- the point of throwing the exception is to alert that the records 
> will not be sent, not that you are in an unrecoverable error state.
> We should throw a different (possibly new) type of exception here to 
> distinguish from fatal and recoverable errors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10316) Consider renaming getter method for Interactive Queries

2020-07-31 Thread John Thomas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169062#comment-17169062
 ] 

John Thomas commented on KAFKA-10316:
-

[~mjsax]  The vote has been open for >72 hours, and the KIP got binding+1 (John 
Roesler) , non-binding+1 (Navinder Brar, Jorge Esteban, Bruno Cadonna) and no 
-1 votes.

Need little help figuring this out , Will the approval be of type 1 or 2 ?
 # The [KIP process point 
4|https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals#KafkaImprovementProposals-Process]
 says : The criteria for acceptance is [lazy 
majority|https://cwiki.apache.org/confluence/display/KAFKA/Bylaws#Bylaws-Approvals].
 
 # At the same time in the 
[Bylaws|https://cwiki.apache.org/confluence/display/KAFKA/Bylaws#Bylaws-Approvals]
 for a "code change action" approval "one +1 from a committer who has not 
authored the patch followed by a Lazy approval (not counting the vote of the 
contributor), moving to lazy majority if a -1 is received" .

> Consider renaming getter method for Interactive Queries
> ---
>
> Key: KAFKA-10316
> URL: https://issues.apache.org/jira/browse/KAFKA-10316
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: John Thomas
>Priority: Minor
>  Labels: beginner, need-kip, newbie
>
> In the 2.5 release, we introduce new classes for Interactive Queries via 
> KIP-535 (cf https://issues.apache.org/jira/browse/KAFKA-6144). The KIP did 
> not specify the names for getter methods of `KeyQueryMetadata` explicitly and 
> they were added in the PR as `getActiveHost()`, `getStandbyHosts()`, and 
> `getPartition()`.
> However, in Kafka it is custom to not use the `get` prefix for getters and 
> thus the methods should have been added as `activeHost()`, `standbyHosts()`, 
> and `partition()`, respectively.
> We should consider renaming the methods accordingly, by deprecating the 
> existing ones and adding the new ones in parallel.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-31 Thread GitBox


guozhangwang commented on pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#issuecomment-667297400


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-31 Thread GitBox


guozhangwang commented on pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#issuecomment-667297606


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Johnny-Malizia commented on pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-31 Thread GitBox


Johnny-Malizia commented on pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#issuecomment-667297977


   Apologies for my inactivity here, I had to take a little more time to 
understand how/when the index files are currently being opened. I'll try to 
have this resolved sometime this weekend. 
   
   My understanding matches what @junrao stated. I have now pushed up a change 
that I believe implements what we are all after. I think the implementation 
makes sense, but if anybody here knows a better way to do this please let me 
know.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-31 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r463780418



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -368,7 +313,96 @@ public void handleAssignment(final Map> activeTasks,
 addNewTask(task);
 }
 }
+}
+
+private void handleCloseAndRecycle(final List tasksToRecycle,
+   final List tasksToCloseClean,
+   final List tasksToCloseDirty,
+   final Map> 
activeTasksToCreate,
+   final Map> 
standbyTasksToCreate,
+   final LinkedHashMap taskCloseExceptions) {
+if (!tasksToCloseDirty.isEmpty()) {
+throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+}
+
+// for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+final List tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+tasksToCheckpoint.addAll(tasksToRecycle);
+for (final Task task : tasksToCheckpoint) {
+try {
+// Always try to first suspend and commit the task before 
checkpointing it;
+// some tasks may already be suspended which should be a no-op.
+//
+// Also since active tasks should already be suspended / 
committed and
+// standby tasks should have no offsets to commit, we should 
expect nothing to commit
+task.suspend();
+
+// Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+// 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+//and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+// 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+//write the checkpoint file.
+final Map offsets = 
task.prepareCommit();

Review comment:
   I'm leaning towards the second option here: in `handleAssignment` we 
actually do not commit at all, but only use 1) `preCommit` to validate certain 
states, and use 2) `postCommit` and expecting it to be a no-op actually. Let me 
see if I can get rid of those two in `handleAssignment`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-31 Thread GitBox


ableegoldman commented on pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#issuecomment-667306865


   Kicked off system tests 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4080/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-31 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r463790190



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -368,7 +313,96 @@ public void handleAssignment(final Map> activeTasks,
 addNewTask(task);
 }
 }
+}
+
+private void handleCloseAndRecycle(final List tasksToRecycle,
+   final List tasksToCloseClean,
+   final List tasksToCloseDirty,
+   final Map> 
activeTasksToCreate,
+   final Map> 
standbyTasksToCreate,
+   final LinkedHashMap taskCloseExceptions) {
+if (!tasksToCloseDirty.isEmpty()) {
+throw new IllegalArgumentException("Tasks to close-dirty should be 
empty");
+}
+
+// for all tasks to close or recycle, we should first right a 
checkpoint as in post-commit
+final List tasksToCheckpoint = new 
ArrayList<>(tasksToCloseClean);
+tasksToCheckpoint.addAll(tasksToRecycle);
+for (final Task task : tasksToCheckpoint) {
+try {
+// Always try to first suspend and commit the task before 
checkpointing it;
+// some tasks may already be suspended which should be a no-op.
+//
+// Also since active tasks should already be suspended / 
committed and
+// standby tasks should have no offsets to commit, we should 
expect nothing to commit
+task.suspend();
+
+// Note that we are not actually committing here but just 
check if we need to write checkpoint file:
+// 1) for active tasks prepareCommit should return empty if it 
has committed during suspension successfully,
+//and their changelog positions should not change at all 
postCommit would not write the checkpoint again.
+// 2) for standby tasks prepareCommit should always return 
empty, and then in postCommit we would probably
+//write the checkpoint file.
+final Map offsets = 
task.prepareCommit();

Review comment:
   I tried some ways and ended-up with explicitly specifying suspend / 
postCommit for `standby` tasks only, and use `prepareCommit` to check if the 
previous revocation has failed or not. Personally I'm happy with the current 
workflow now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9107: KAFKA-5488: KIP-418 implementation

2020-07-31 Thread GitBox


vvcephei commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-667352270


   Hey @mjsax , do you have time to give this a first pass?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgroothuijsen commented on a change in pull request #9078: KAFKA-10132: Return correct value types for MBean attributes

2020-07-31 Thread GitBox


rgroothuijsen commented on a change in pull request #9078:
URL: https://github.com/apache/kafka/pull/9078#discussion_r463845316



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##
@@ -272,8 +272,16 @@ public MBeanInfo getMBeanInfo() {
 for (Map.Entry entry : 
this.metrics.entrySet()) {
 String attribute = entry.getKey();
 KafkaMetric metric = entry.getValue();
+String metricType = double.class.getName();
+
+try {
+metricType = metric.metricValue().getClass().getName();
+} catch (NullPointerException e) {

Review comment:
   @abbccdda That's what I figured as well at first, and it passes in unit 
tests, but there's a strange side effect. Upon starting `connect-distributed`, 
a whole bunch of NPEs appear for various metrics. It looks like when I call 
`metricValue()`, it eventually arrives 
[here](https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1811),
 and throws an error because `member` is null. I could also do a null check 
there, though I'm not sure what value the method would return in that case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7777) Decouple topic serdes from materialized serdes

2020-07-31 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169176#comment-17169176
 ] 

Guozhang Wang commented on KAFKA-:
--

Another walkaround for now, is to use `KTable#mapValues()` in which we can 
project out the unwanted fields and then materialize. More specifically, you 
can write:

builder.table("topic", Consumed)   // do not use Materialized to 
enforce materializing the source table
   .mapValues(..., Materialized)   // project out those unwanted 
fields, and then Materialize with the new serde
   
As a result only one store would be created for the resulted KTable after the 
mapValues.

> Decouple topic serdes from materialized serdes
> --
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Reporter: Maarten
>Priority: Minor
>  Labels: needs-kip
>
> It would be valuable to us to have the the encoding format in a Kafka topic 
> decoupled from the encoding format used to cache the data locally in a kafka 
> streams app. 
> We would like to use the `range()` function in the interactive queries API to 
> look up a series of results, but can't with our encoding scheme due to our 
> keys being variable length.
> We use protobuf, but based on what I've read Avro, Flatbuffers and Cap'n 
> proto have similar problems.
> Currently we use the following code to work around this problem:
> {code}
> builder
> .stream("input-topic", Consumed.with(inputKeySerde, inputValueSerde))
> .to("intermediate-topic", Produced.with(intermediateKeySerde, 
> intermediateValueSerde)); 
> t1 = builder
> .table("intermediate-topic", Consumed.with(intermediateKeySerde, 
> intermediateValueSerde), t1Materialized);
> {code}
> With the encoding formats decoupled, the code above could be reduced to a 
> single step, not requiring an intermediate topic.
> Based on feedback on my [SO 
> question|https://stackoverflow.com/questions/53913571/is-there-a-way-to-separate-kafka-topic-serdes-from-materialized-serdes]
>  a change that introduces this would impact state restoration when using an 
> input topic for recovery.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman edited a comment on pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-31 Thread GitBox


ableegoldman edited a comment on pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#issuecomment-667306865


   Kicked off system tests 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4081



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-31 Thread GitBox


vvcephei commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r463867098



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -461,6 +463,42 @@ public void flush() {
 }
 }
 
+public void flushCache() {
+RuntimeException firstException = null;
+// attempting to flush the stores
+if (!stores.isEmpty()) {
+log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+for (final StateStoreMetadata metadata : stores.values()) {
+final StateStore store = metadata.stateStore;
+
+try {
+// buffer should be flushed to send all records to 
changelog
+if (store instanceof TimeOrderedKeyValueBuffer) {
+store.flush();
+} else if (store instanceof CachedStateStore) {
+((CachedStateStore) store).flushCache();
+}

Review comment:
   Seems like there's the missing possibility that it's not TimeOrdered or 
Cached. Should we log a different message than "Flushed cache or buffer" in 
that case, to indicate we _didn't_ flush it?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -267,80 +266,26 @@ public void handleAssignment(final Map> activeTasks,
 // check for tasks that were owned previously but have changed 
active/standby status
 tasksToRecycle.add(task);
 } else {
-tasksToClose.add(task);
-}
-}
-
-for (final Task task : tasksToClose) {
-try {
-if (task.isActive()) {
-// Active tasks are revoked and suspended/committed during 
#handleRevocation
-if (!task.state().equals(State.SUSPENDED)) {
-log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-  task.id(), task.state());
-throw new IllegalStateException("Active task " + 
task.id() + " should have been suspended");
-}
-} else {
-task.suspend();
-task.prepareCommit();
-task.postCommit();
-}
-completeTaskCloseClean(task);
-cleanUpTaskProducer(task, taskCloseExceptions);
-tasks.remove(task.id());
-} catch (final RuntimeException e) {
-final String uncleanMessage = String.format(
-"Failed to close task %s cleanly. Attempting to close 
remaining tasks before re-throwing:",
-task.id());
-log.error(uncleanMessage, e);
-taskCloseExceptions.put(task.id(), e);
-// We've already recorded the exception (which is the point of 
clean).
-// Now, we should go ahead and complete the close because a 
half-closed task is no good to anyone.
-dirtyTasks.add(task);
+tasksToCloseClean.add(task);
 }
 }
 
-for (final Task oldTask : tasksToRecycle) {
-final Task newTask;
-try {
-if (oldTask.isActive()) {
-if (!oldTask.state().equals(State.SUSPENDED)) {
-// Active tasks are revoked and suspended/committed 
during #handleRevocation
-log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-  oldTask.id(), oldTask.state());
-throw new IllegalStateException("Active task " + 
oldTask.id() + " should have been suspended");
-}
-final Set partitions = 
standbyTasksToCreate.remove(oldTask.id());
-newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
-cleanUpTaskProducer(oldTask, taskCloseExceptions);
-} else {
-oldTask.suspend();
-oldTask.prepareCommit();
-oldTask.postCommit();
-final Set partitions = 
activeTasksToCreate.remove(oldTask.id());
-newTask = 
activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, 
partitions, mainConsumer);
-}
-tasks.remove(oldTask.id());
-addNewTask(newTask);
-} catch (final RuntimeException e) {
-final String uncleanMessage = String.format("Failed to recycle 
task %s cleanly. Attempting to close remaining tasks before re-throwing:", 
oldTask.id());
-log.error(uncleanMessage, e);
-  

[jira] [Commented] (KAFKA-8159) Built-in serdes for signed numbers do not obey lexicographical ordering

2020-07-31 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169187#comment-17169187
 ] 

Sophie Blee-Goldman commented on KAFKA-8159:


A related bummer is that you can't do fetches over a range of positive and 
negative timestamps in a Window/Session store (well you can, you just won't get 
the behavior you'd expect)

I say "related" because we don't use the built-in serdes to serialize the long 
in a windowed key, we use ByteBuffer#putLong. But it has the same problem with 
our lexicographical bytes

> Built-in serdes for signed numbers do not obey lexicographical ordering
> ---
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Currently we assume consistent ordering between serialized and deserialized 
> keys, e.g. if the objects obey objA < objB < objC then the serialized Bytes 
> will also obey bytesA < bytesB < bytesC. This is not true in general of the 
> built-in serdes for signed numerical types (eg Integer, Long). Specifically, 
> it is broken by the negative number representations which are 
> lexicographically greater than (all) positive number representations. 
>  
> One consequence of this is that an interactive query of a key range with a 
> negative lower bound and positive upper bound (eg keyValueStore.range(-1, 1) 
> will result in "unexpected behavior" depending on the specific store type.
>  
> For RocksDB stores with caching disabled, an empty iterator will be returned 
> regardless of whether any records do exist in that range. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception and crash.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8027) Gradual decline in performance of CachingWindowStore provider when number of keys grow

2020-07-31 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169189#comment-17169189
 ] 

Guozhang Wang commented on KAFKA-8027:
--

We encountered similar issues in our benchmarks which is based on recent Kafka 
versions as well. Looking at the profiler graph, there are three big buckets:

1) byte-buffer allocation for concatenating the segmented key from raw key / 
timestamp. ~10%
2) synchronization on the cache layer to access cache to get the iterator. ~20%
3) putting all the range keys into a tree-map (i.e. a putAll will be called) 
before iterating them to achieve thread safety. ~60%

Among those, I've had some ideas to optimize 1), and is still digging around 
how to make 2) / 3) to be less costly. I will try to prepare a PR in our 
benchmarks and post the results here.

> Gradual decline in performance of CachingWindowStore provider when number of 
> keys grow
> --
>
> Key: KAFKA-8027
> URL: https://issues.apache.org/jira/browse/KAFKA-8027
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Prashant
>Priority: Major
>  Labels: interactivequ, kafka-streams
>
> We observed this during a performance test of our stream application which 
> tracks user's activity and provides REST interface to query the window state 
> store.  We used default configuration of Materialized i.e. withCachingEnabled 
> for storing user behaviour stats in a window state store 
> (CompositeWindowStore with CachingWindowStore as underlyin which internally 
> uses RocksDBStore for persistent).  
> While querying window store with store.fetch(key, long, long), it internally 
> tries to fetch the range from ThreadCache which uses a byte iterator to 
> search for a key in cache and on a cache miss it goes to RocksDBStore for 
> result. So, when number of keys in cache becomes large this ThreadCache 
> search starts taking time (range Iterator on all keys) which impacts 
> WindowStore query performance.
>  
> Workaround: If we disable cache with switch on Materialized instance i.e. 
> withCachingDisabled, key search is delegated directly to RocksDBStore which 
> is way faster and completed search in microseconds against millis in case of 
> CachingWindowStore.  
>  
> Stats: With Unique users > 0.5M, random search for a key i.e. UserId:
>  
> withCachingEnabled :  40 < t < 80ms (upper bound increases as unique users 
> grow)
> withCahingDisabled: t < 1ms (Almost constant time)      



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463880076



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1215,71 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker. It can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter. This is particularly useful if the user requires strongly 
consistent reads of
+ * finalized features.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the {@link DescribeFeaturesResult} containing the 
result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);

Review comment:
   Sorry, I do not understand why should describeFeatures (in post KIP-500) 
be handled only by controller?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10211) Add DirectoryConfigProvider

2020-07-31 Thread David Weber (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169192#comment-17169192
 ] 

David Weber commented on KAFKA-10211:
-

Here is an example I use in our environment 
[https://github.com/dweber019/kafka-connect-file-directory-config-provider]

> Add DirectoryConfigProvider
> ---
>
> Key: KAFKA-10211
> URL: https://issues.apache.org/jira/browse/KAFKA-10211
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>
> Add a ConfigProvider which reads secrets from files in a directory, per 
> [KIP-632|https://cwiki.apache.org/confluence/display/KAFKA/KIP-632%3A+Add+DirectoryConfigProvider].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-31 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-667442912


   > So, it's just a rebase and there is no change in your PR?
   
   The last change to this PR is to rename a class (according to @ijuma’s 
comment)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-31 Thread GitBox


ableegoldman commented on pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#issuecomment-667445902


   `EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta` failed, 
all other tests passed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9024: [DO NOT MERGE (yet)] MINOR: bump Streams integration test log level to DEBUG

2020-07-31 Thread GitBox


ableegoldman commented on pull request #9024:
URL: https://github.com/apache/kafka/pull/9024#issuecomment-667447234


   Ok I totally forgot about this PR.
   
   Let's just merge the demotion of broker/zookeeper logs and leave streams 
logs at INFO for now. If that's not sufficient for debugging eg the 
`EosBetaUpgradeIntegrationTest` then I'll take another look at debugging the 
tests that fail when DEBUG logging is turned on.
   
   I did take a brief look at the tests, it's not a timeout or excessive logs 
or anything that seems at all correlated to the log level. It fails on an 
assertion of the value of `MockProcessorSupplier#capturedProcessors` 🤷‍♀️ 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #9024: [DO NOT MERGE (yet)] MINOR: bump Streams integration test log level to DEBUG

2020-07-31 Thread GitBox


ableegoldman edited a comment on pull request #9024:
URL: https://github.com/apache/kafka/pull/9024#issuecomment-667447234


   Ok I totally forgot about this PR.
   
   @guozhangwang @mjsax Let's just merge the demotion of broker/zookeeper logs 
and leave streams logs at INFO for now. If that's not sufficient for useful 
debugging (eg the `EosBetaUpgradeIntegrationTest`) then I'll take another look 
at fixing the tests which fail when DEBUG logging is turned on.
   
   I did take a brief look at the failing tests, it's not a timeout or 
excessive logs or anything that seems at all correlated to the log level. It 
fails on an assertion of the value of 
`MockProcessorSupplier#capturedProcessors` -- seems pretty bizarre to me 🤷‍♀️



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dhruvilshah3 opened a new pull request #9110: MINOR: Ensure a reason is logged for every segment deletion

2020-07-31 Thread GitBox


dhruvilshah3 opened a new pull request #9110:
URL: https://github.com/apache/kafka/pull/9110


   This PR improves the logging for segment deletion to ensure that a reason is 
logged for every segment that is deleted.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dhruvilshah3 commented on pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-07-31 Thread GitBox


dhruvilshah3 commented on pull request #8850:
URL: https://github.com/apache/kafka/pull/8850#issuecomment-667454222


   I attempted to improve the logging further. This also removes the side 
effect of logging as part of evaluating the predicate. 
https://github.com/apache/kafka/pull/9110



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…

2020-07-31 Thread GitBox


chia7712 commented on pull request #9102:
URL: https://github.com/apache/kafka/pull/9102#issuecomment-667456986


   > Are there any serde use cases that require client id?
   
   I observed this issue when we are removing custom client id config. It 
causes error as the metrics of our serde is encoded with client id. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463912157



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {
+final KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+final long now = time.milliseconds();
+final NodeProvider provider =
+options.sendRequestToController() ? new ControllerNodeProvider() : 
new LeastLoadedNodeProvider();
+
+Call call = new Call(
+"describeFeatures", calcDeadlineMs(now, options.timeoutMs()), 
provider) {
+
+@Override
+ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+return new ApiVersionsRequest.Builder();
+}
+
+@Override
+void handleResponse(AbstractResponse response) {
+final ApiVersionsResponse apiVersionsResponse = 
(ApiVersionsResponse) response;
+if (apiVersionsResponse.data.errorCode() == 
Errors.NONE.code()) {
+future.complete(
+new FeatureMetadata(
+apiVersionsResponse.finalizedFeatures(),
+apiVersionsResponse.finalizedFeaturesEpoch(),
+apiVersionsResponse.supportedFeatures()));
+} else if (options.sendRequestToController() && 
apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+handleNotControllerError(Errors.NOT_CONTROLLER);
+} else {
+future.completeExceptionally(
+
Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+}
+}
+
+@Override
+void handleFailure(Throwable throwable) {
+completeAllExceptionally(Collections.singletonList(future), 
throwable);
+}
+};
+
+runnable.call(call, now);
+return new DescribeFeaturesResult(future);
+}
+
+@Override
+public UpdateFeaturesResult updateFeatures(
+final Map featureUpdates, final 
UpdateFeaturesOptions options) {
+if (featureUpdates == null || featureUpdates.isEmpty()) {
+throw new IllegalArgumentException("Feature updates can not be 
null or empty.");
+}
+Objects.requireNonNull(options, "UpdateFeaturesOptions can not be 
null");
+
+final Map> updateFutures = new 
HashMap<>();
+final UpdateFeaturesRequestData.FeatureUpdateKeyCollection 
featureUpdatesRequestData
+= new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+for (Map.Entry entry : 
featureUpdates.entrySet()) {
+final String feature = entry.getKey();
+final FeatureUpdate update = entry.getValue();
+if (feature.trim().isEmpty()) {
+throw new IllegalArgumentException("Provided feature can not 
be null or empty.");
+}
+
+updateFutures.put(feature, new KafkaFutureImpl<>());
+final UpdateFeaturesRequestData.FeatureUpdateKey requestItem =
+new UpdateFeaturesRequestData.FeatureUpdateKey();
+requestItem.setFeature(feature);
+requestItem.setMaxVersionLevel(update.maxVersionLevel());
+requestItem.setAllowDowngrade(update.allowDowngrade());
+featureUpdatesRequestData.add(requestItem);
+}
+final UpdateFeaturesRequestData request = new 
UpdateFeaturesRequestData().setFeatureUpdates(featureUpdatesRequestData);
+
+final long now = time.milliseconds();
+final Call call = new Call("updateFeatures", calcDeadlineMs(now, 
options.timeoutMs()),
+new ControllerNodeProvider()) {
+
+@Override
+UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+return new UpdateFeaturesRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+final UpdateFeaturesResponse response =
+(UpdateFeaturesResponse) abstractResponse;
+
+// Check for controller change.
+for (UpdatableFeatureResult result : 
response.data().results()) {
+final Errors error = Errors.forCode(result.errorCode());
+if (error == Errors.NOT_CONTROLLER) {
+handleNotControllerError(error);
+throw error.exception();
+}
+}
+
+for (UpdatableFeatureResult result : 
response.data().results()) {
+   

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463912498



##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/FeatureUpdateFailedException.java
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class FeatureUpdateFailedException extends ApiException {

Review comment:
   This exception corresponds to `Errors.FEATURE_UPDATE_FAILED`. The client 
can receive this exception during a call to `updateFeatures`, whenever a 
feature update can not be written to ZK. So this has to be a public error.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9110: MINOR: Ensure a reason is logged for every segment deletion

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9110:
URL: https://github.com/apache/kafka/pull/9110#discussion_r463914087



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2227,14 +2210,17 @@ class Log(@volatile private var _dir: File,
* @param segments The log segments to schedule for deletion
* @param asyncDelete Whether the segment files should be deleted 
asynchronously
*/
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment], 
asyncDelete: Boolean): Unit = {
+  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
+  asyncDelete: Boolean,
+  reason: SegmentDeletionReason): Unit = {
 if (segments.nonEmpty) {
   lock synchronized {
 // As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
 // removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
 // iteration remain valid and deterministic.
 val toDelete = segments.toList
 toDelete.foreach { segment =>
+  info(s"${reason.reasonString(this, segment)}")

Review comment:
   If we passed in the deletion reason further into the 
`deleteSegmentFiles` method, it seems we can print the reason string just once 
for a batch of segments being deleted. And within the reason string, we can 
provide the reason for deleting the batch:
   
   
https://github.com/confluentinc/ce-kafka/blob/master/core/src/main/scala/kafka/log/Log.scala#L2519
   
https://github.com/confluentinc/ce-kafka/blob/master/core/src/main/scala/kafka/log/Log.scala#L2526
   
   ex: `info("Deleting segments due to $reason: ${segments.mkString(",")}"`
   
   where `$reason` provides `due to retention time 120ms breach`.
   
   The drawback is that sometimes we can not print segment-specific information 
since the error message is at a batch level. But generally it may be that 
segment-level deletion information could bloat our server logging, so it may be 
better to batch the logging instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463912498



##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/FeatureUpdateFailedException.java
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class FeatureUpdateFailedException extends ApiException {

Review comment:
   This exception corresponds to `Errors.FEATURE_UPDATE_FAILED`. The caller 
of `AdminClient#updateFeatures` can receive this exception whenever a feature 
update can not be written to ZK (due to a ZK issue). So this has to be a public 
error.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463915406



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1647,6 +1844,188 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  /**
+   * Returns the new FinalizedVersionRange for the feature, if there are no 
feature
+   * incompatibilities seen with all known brokers for the provided feature 
update.
+   * Otherwise returns a suitable error.
+   *
+   * @param update   the feature update to be processed (this can not be meant 
to delete the feature)
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def newFinalizedVersionRangeOrIncompatibilityError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, 
ApiError] = {
+if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+  throw new IllegalArgumentException(s"Provided feature update can not be 
meant to delete the feature: $update")
+}
+// NOTE: Below we set the finalized min version level to be the default 
minimum version
+// level. If the finalized feature already exists, then, this can cause 
deprecation of all
+// version levels in the closed range:
+// [existingVersionRange.min(), defaultMinVersionLevel - 1].
+val defaultMinVersionLevel = 
brokerFeatures.defaultMinVersionLevel(update.feature)
+val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, 
update.maxVersionLevel)
+val numIncompatibleBrokers = 
controllerContext.liveOrShuttingDownBrokers.count(broker => {
+  val singleFinalizedFeature =
+Features.finalizedFeatures(Utils.mkMap(Utils.mkEntry(update.feature, 
newVersionRange)))
+  BrokerFeatures.hasIncompatibleFeatures(broker.features, 
singleFinalizedFeature)
+})
+if (numIncompatibleBrokers == 0) {
+  Left(newVersionRange)
+} else {
+  Right(
+new ApiError(Errors.INVALID_REQUEST,
+ s"Could not apply finalized feature update because 
$numIncompatibleBrokers" +
+ " brokers were found to have incompatible features."))
+}
+  }
+
+  /**
+   * Validate and process a finalized feature update.
+   *
+   * If the processing is successful, then, the return value contains:
+   * 1. the new FinalizedVersionRange for the feature, if the feature update 
was not meant to delete the feature.
+   * 2. Option.empty, if the feature update was meant to delete the feature.
+   *
+   * If the processing failed, then returned value contains a suitable 
ApiError.
+   *
+   * @param update   the feature update to be processed.
+   *
+   * @return the new FinalizedVersionRange or error, as described 
above.
+   */
+  private def processFeatureUpdate(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+val existingFeatures = featureCache.get
+  .map(featuresAndEpoch => featuresAndEpoch.features.features().asScala)
+  .getOrElse(Map[String, FinalizedVersionRange]())
+
+def newVersionRangeOrError(update: 
UpdateFeaturesRequestData.FeatureUpdateKey): 
Either[Option[FinalizedVersionRange], ApiError] = {
+  newFinalizedVersionRangeOrIncompatibilityError(update)
+.fold(versionRange => Left(Some(versionRange)), error => Right(error))
+}
+
+if (update.feature.isEmpty) {
+  // Check that the feature name is not empty.
+  Right(new ApiError(Errors.INVALID_REQUEST, "Feature name can not be 
empty."))
+} else {
+  val cacheEntry = existingFeatures.get(update.feature).orNull
+
+  // We handle deletion requests separately from non-deletion requests.
+  if (UpdateFeaturesRequest.isDeleteRequest(update)) {
+if (cacheEntry == null) {
+  // Disallow deletion of a non-existing finalized feature.
+  Right(new ApiError(Errors.INVALID_REQUEST,
+ s"Can not delete non-existing finalized feature: 
'${update.feature}'"))
+} else {
+  Left(Option.empty)
+}
+  } else if (update.maxVersionLevel() < 1) {
+// Disallow deletion of a finalized feature without allowDowngrade 
flag set.
+Right(new ApiError(Errors.INVALID_REQUEST,
+   s"Can not provide maxVersionLevel: 
${update.maxVersionLevel} less" +
+   s" than 1 for feature: '${update.feature}' without 
setting the" +
+   " allowDowngrade flag to true in the request."))
+  } else {
+if (cacheEntry == null) {
+  newVersionRangeOrError(update)
+} else {
+  if (update.maxVersionLevel == cacheEntry.max()) {
+// Disallow a case where target maxVersionLevel matches existing 
maxVersionLevel.
+Right(new ApiError(Errors.INVALID_REQUEST,
+   s"Can not ${if (update.allowDowngrade) 
"do

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463915553



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -266,6 +275,178 @@ class KafkaController(val config: KafkaConfig,
 }
   }
 
+  private def createFeatureZNode(newNode: FeatureZNode): Int = {
+info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$newNode")
+zkClient.createFeatureZNode(newNode)
+val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path)
+newVersion
+  }
+
+  private def updateFeatureZNode(updatedNode: FeatureZNode): Int = {
+info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: 
$updatedNode")
+zkClient.updateFeatureZNode(updatedNode)
+  }
+
+  /**
+   * This method enables the feature versioning system (KIP-584).
+   *
+   * Development in Kafka (from a high level) is organized into features. Each 
feature is tracked by
+   * a name and a range of version numbers. A feature can be of two types:
+   *
+   * 1. Supported feature:
+   * A supported feature is represented by a name (String) and a range of 
versions (defined by a
+   * {@link SupportedVersionRange}). It refers to a feature that a particular 
broker advertises
+   * support for. Each broker advertises the version ranges of its own 
supported features in its
+   * own BrokerIdZNode. The contents of the advertisement are specific to the 
particular broker and
+   * do not represent any guarantee of a cluster-wide availability of the 
feature for any particular
+   * range of versions.
+   *
+   * 2. Finalized feature:
+   * A finalized feature is represented by a name (String) and a range of 
version levels (defined
+   * by a {@link FinalizedVersionRange}). Whenever the feature versioning 
system (KIP-584) is
+   * enabled, the finalized features are stored in the cluster-wide common 
FeatureZNode.
+   * In comparison to a supported feature, the key difference is that a 
finalized feature exists
+   * in ZK only when it is guaranteed to be supported by any random broker in 
the cluster for a
+   * specified range of version levels. Also, the controller is the only 
entity modifying the
+   * information about finalized features.
+   *
+   * This method sets up the FeatureZNode with enabled status, which means 
that the finalized
+   * features stored in the FeatureZNode are active. The enabled status should 
be written by the
+   * controller to the FeatureZNode only when the broker IBP config is greater 
than or equal to
+   * KAFKA_2_7_IV0.
+   *
+   * There are multiple cases handled here:
+   *
+   * 1. New cluster bootstrap:
+   *A new Kafka cluster (i.e. it is deployed first time) is almost always 
started with IBP config
+   *setting greater than or equal to KAFKA_2_7_IV0. We would like to start 
the cluster with all
+   *the possible supported features finalized immediately. Assuming this 
is the case, the
+   *controller will start up and notice that the FeatureZNode is absent in 
the new cluster,
+   *it will then create a FeatureZNode (with enabled status) containing 
the entire list of
+   *default supported features as its finalized features.
+   *
+   * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0:
+   *Imagine there is an existing Kafka cluster with IBP config less than 
KAFKA_2_7_IV0, and the
+   *broker binary has been upgraded to a newer version that supports the 
feature versioning
+   *system (KIP-584). This means the user is upgrading from an earlier 
version of the broker
+   *binary. In this case, we want to start with no finalized features and 
allow the user to
+   *finalize them whenever they are ready i.e. in the future whenever the 
user sets IBP config
+   *to be greater than or equal to KAFKA_2_7_IV0, then the user could 
start finalizing the
+   *features. This process ensures we do not enable all the possible 
features immediately after
+   *an upgrade, which could be harmful to Kafka.
+   *This is how we handle such a case:
+   *  - Before the IBP config upgrade (i.e. IBP config set to less than 
KAFKA_2_7_IV0), the
+   *controller will start up and check if the FeatureZNode is absent. 
If absent, it will
+   *react by creating a FeatureZNode with disabled status and empty 
finalized features.
+   *Otherwise, if a node already exists in enabled status then the 
controller will just
+   *flip the status to disabled and clear the finalized features.
+   *  - After the IBP config upgrade (i.e. IBP config set to greater than 
or equal to
+   *KAFKA_2_7_IV0), when the controller starts up it will check if the 
FeatureZNode exists
+   *and whether it is disabled. In such a case, it won’t upgrade all 
features immediately.
+   *Instead it will just switch the FeatureZNode status to enabled 
status. This lets

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463916219



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3615,6 +3662,137 @@ public void testListOffsetsNonRetriableErrors() throws 
Exception {
 }
 }
 
+@Test
+public void testUpdateFeaturesDuringSuccess() throws Exception {
+testUpdateFeatures(
+makeTestFeatureUpdates(),
+makeTestFeatureUpdateErrors(Errors.NONE));
+}
+
+@Test
+public void testUpdateFeaturesInvalidRequestError() throws Exception {
+testUpdateFeatures(
+makeTestFeatureUpdates(),
+makeTestFeatureUpdateErrors(Errors.INVALID_REQUEST));
+}
+
+@Test
+public void testUpdateFeaturesUpdateFailedError() throws Exception {
+testUpdateFeatures(
+makeTestFeatureUpdates(),
+makeTestFeatureUpdateErrors(Errors.FEATURE_UPDATE_FAILED));
+}
+
+@Test
+public void testUpdateFeaturesPartialSuccess() throws Exception {
+final Map errors = 
makeTestFeatureUpdateErrors(Errors.NONE);
+errors.put("test_feature_2", Errors.INVALID_REQUEST);
+testUpdateFeatures(makeTestFeatureUpdates(), errors);
+}
+
+private Map makeTestFeatureUpdates() {
+return Utils.mkMap(
+Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2, 
false)),
+Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3, 
true)));
+}
+
+private Map makeTestFeatureUpdateErrors(final Errors 
error) {
+final Map updates = makeTestFeatureUpdates();
+final Map errors = new HashMap<>();
+for (Map.Entry entry : updates.entrySet()) {
+errors.put(entry.getKey(), error);
+}
+return errors;
+}
+
+private void testUpdateFeatures(Map featureUpdates,
+Map featureUpdateErrors) 
throws Exception {
+try (final AdminClientUnitTestEnv env = mockClientEnv()) {
+env.kafkaClient().prepareResponse(
+body -> body instanceof UpdateFeaturesRequest,
+prepareUpdateFeaturesResponse(featureUpdateErrors));
+final Map> futures = 
env.adminClient().updateFeatures(
+featureUpdates,
+new UpdateFeaturesOptions().timeoutMs(1)).values();
+for (Map.Entry> entry : 
futures.entrySet()) {
+final KafkaFuture future = entry.getValue();
+final Errors error = featureUpdateErrors.get(entry.getKey());
+if (error == Errors.NONE) {
+future.get();
+} else {
+final ExecutionException e = 
assertThrows(ExecutionException.class,
+() -> future.get());

Review comment:
   Isn't that what I'm using currently?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463916470



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.message.UpdateFeaturesResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+
+/**
+ * Possible error codes:
+ *
+ *   - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
+ *   - {@link Errors#NOT_CONTROLLER}
+ *   - {@link Errors#INVALID_REQUEST}
+ *   - {@link Errors#FEATURE_UPDATE_FAILED}
+ */
+public class UpdateFeaturesResponse extends AbstractResponse {
+
+private final UpdateFeaturesResponseData data;
+
+public UpdateFeaturesResponse(UpdateFeaturesResponseData data) {
+this.data = data;
+}
+
+public UpdateFeaturesResponse(Struct struct) {
+final short latestVersion = (short) 
(UpdateFeaturesResponseData.SCHEMAS.length - 1);
+this.data = new UpdateFeaturesResponseData(struct, latestVersion);
+}
+
+public UpdateFeaturesResponse(Struct struct, short version) {
+this.data = new UpdateFeaturesResponseData(struct, version);
+}
+
+public Map errors() {
+return data.results().valuesSet().stream().collect(
+Collectors.toMap(
+result -> result.feature(),

Review comment:
   Like how? I don't understand.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463916688



##
File path: 
clients/src/main/resources/common/message/UpdateFinalizedFeaturesResponse.json
##
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 50,
+  "type": "response",
+  "name": "UpdateFinalizedFeaturesResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The error code or `0` if there was no error." },
+{ "name": "ErrorMessage", "type": "string", "versions": "0+", 
"nullableVersions": "0+",

Review comment:
   Done now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463916610



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -983,8 +1144,25 @@ class KafkaController(val config: KafkaConfig,
*/
   private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], 
partitions: Set[TopicPartition]): Unit = {
 try {
+  val filteredBrokers = scala.collection.mutable.Set[Int]() ++ brokers
+  if (config.isFeatureVersioningEnabled) {
+def hasIncompatibleFeatures(broker: Broker): Boolean = {
+  val latestFinalizedFeatures = featureCache.get
+  if (latestFinalizedFeatures.isDefined) {
+BrokerFeatures.hasIncompatibleFeatures(broker.features, 
latestFinalizedFeatures.get.features)
+  } else {
+false
+  }
+}
+controllerContext.liveOrShuttingDownBrokers.foreach(broker => {
+  if (filteredBrokers.contains(broker.id) && 
hasIncompatibleFeatures(broker)) {

Review comment:
   If the broker has feature incompatibilities, then it should die as soon 
as it has received the ZK update (it would die from within 
`FinalizedFeatureChangeListener`).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] albert02lowis commented on pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…

2020-07-31 Thread GitBox


albert02lowis commented on pull request #9108:
URL: https://github.com/apache/kafka/pull/9108#issuecomment-667462930


   Hi, there's another unit test that need to be moved out 
(`StreamStreamJoinIntegrationTest.shouldNotAccessJoinStoresWhenGivingName`) but 
I thought of doing that in another PR (to make the PRs small). But let me know 
if it's ok to just include it in this PR.
   
   (Btw, I force-push updated the commit with the right user email)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-31 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463916470



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.message.UpdateFeaturesResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+
+/**
+ * Possible error codes:
+ *
+ *   - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
+ *   - {@link Errors#NOT_CONTROLLER}
+ *   - {@link Errors#INVALID_REQUEST}
+ *   - {@link Errors#FEATURE_UPDATE_FAILED}
+ */
+public class UpdateFeaturesResponse extends AbstractResponse {
+
+private final UpdateFeaturesResponseData data;
+
+public UpdateFeaturesResponse(UpdateFeaturesResponseData data) {
+this.data = data;
+}
+
+public UpdateFeaturesResponse(Struct struct) {
+final short latestVersion = (short) 
(UpdateFeaturesResponseData.SCHEMAS.length - 1);
+this.data = new UpdateFeaturesResponseData(struct, latestVersion);
+}
+
+public UpdateFeaturesResponse(Struct struct, short version) {
+this.data = new UpdateFeaturesResponseData(struct, version);
+}
+
+public Map errors() {
+return data.results().valuesSet().stream().collect(
+Collectors.toMap(
+result -> result.feature(),

Review comment:
   Like how? I don't understand. Isn't that what I'm doing currently?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9108: KAFKA-9273: Extract testShouldAutoShutdownOnIncompleteMetadata from S…

2020-07-31 Thread GitBox


abbccdda commented on pull request #9108:
URL: https://github.com/apache/kafka/pull/9108#issuecomment-667466886


   @albert02lowis I see, let's try to do all of them in one PR then.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…

2020-07-31 Thread GitBox


abbccdda commented on a change in pull request #9102:
URL: https://github.com/apache/kafka/pull/9102#discussion_r463920622



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -228,6 +228,13 @@ public Password getPassword(String key) {
 return copy;
 }
 
+public Map originals(Map configOverrides) {

Review comment:
   s/originals/copyWithOverride?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10336) Rolling upgrade with Suppression AND Standbys may throw exceptions

2020-07-31 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17169229#comment-17169229
 ] 

John Roesler commented on KAFKA-10336:
--

Good thinking, Sophie, that does indeed seem possible. 

> Rolling upgrade with Suppression AND Standbys may throw exceptions
> --
>
> Key: KAFKA-10336
> URL: https://issues.apache.org/jira/browse/KAFKA-10336
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: John Roesler
>Priority: Blocker
>  Labels: bug, user-experience
> Fix For: 2.7.0
>
>
> Tl;dr:
> If you have standbys AND use Suppress with changelogging enabled, you may 
> experience exceptions leading to threads shutting down on the OLD instances 
> during a rolling upgrade. No corruption is expected, and when the rolling 
> upgrade completes, all threads will be running and processing correctly.
> Details:
> The Suppression changelog has had to change its internal data format several 
> times to fix bugs. The binary schema of the changelog values is determined by 
> a version header on the records, and new versions are able to decode all old 
> versions' formats.
> The suppression changelog decoder is also configured to throw an exception if 
> it encounters a version number that it doesn't recognize, causing the thread 
> to stop processing and shut down.
> When standbys are configured, there is one so-called "active" worker writing 
> into the suppression buffer and sending the same messages into the changelog, 
> while another "standby" worker reads those messages, decodes them, and 
> maintains a hot-standby replica of the suppression buffer.
> If the standby worker is running and older version of Streams than the active 
> worker, what can happen today is that the active worker may write changelog 
> messages with a higher version number than the standby worker can understand. 
> When the standby worker receives one of these messages, it will throw the 
> exception and shut down its thread.
> Note, although the exceptions are undesired, at least this behavior protects 
> the integrity of the application and prevents data corruption or loss.
> Workaround:
> Several workarounds are possible:
> This only affects clusters that do all of (A) rolling bounce, (B) 
> suppression, (C) standby replicas, (D) changelogged suppression buffers. 
> Changing any of those four variables will prevent the issue from occurring. I 
> would NOT recommend disabling (D), and (B) is probably off the table, since 
> the application logic presumably depends on it. Therefore, your practical 
> choices are to disable standbys (C), or to do a full-cluster bounce (A). 
> Personally, I think (A) is the best option.
> Also note, although the exceptions and threads shutting down are not ideal, 
> they would only afflict the old-versioned nodes. I.e., the nodes you intend 
> to replace anyway. So another "workaround" is simply to ignore the exceptions 
> and proceed with the rolling bounce. As the old-versioned nodes are replaced 
> with new-versioned nodes, the new nodes will again be able to decode their 
> peers' changelog messages and be able to maintain the hot-standby replicas of 
> the suppression buffers.
> Detection:
> Although I really should have anticipated this condition, I first detected it 
> while expanding our system test coverage as part of KAFKA-10173. I added a 
> rolling upgrade test with an application that uses both suppression and 
> standby replicas, and observed that the rolling upgrades would occasionally 
> cause the old nodes to crash. Accordingly, in KAFKA-10173, I disabled the 
> rolling-upgrade configuration and only do full-cluster upgrades. Resolving 
> _this_ ticket will allow us to re-enable rolling upgrades.
> Proposed solution:
> Part 1:
> Since Streams can decode both current and past versions, but not future 
> versions, we need to implement a mechanism to prevent new-versioned nodes 
> from writing new-versioned messages, which would appear as future-versioned 
> messages to the old-versioned nodes.
> We have an UPGRADE_FROM configuration that we could leverage to accomplish 
> this. In that case, when upgrading from 2.3 to 2.4, you would set 
> UPGRADE_FROM to "2.3", and then do a rolling upgrade to 2.4. The new (2.4) 
> nodes would continue writing messages in the old (2.3) format. Thus, the 
> still-running old nodes will still be able to read them.
> Then, you would remove the UPGRADE_FROM config and do ANOTHER rolling bounce. 
> Post-bounce, the nodes would start writing in the 2.4 format, which is ok 
> because all the members are running 2.4 at this point and can decode these 
> messages, even if they are still configured to write with version

[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…

2020-07-31 Thread GitBox


chia7712 commented on pull request #9102:
URL: https://github.com/apache/kafka/pull/9102#issuecomment-667482560


   > Btw, I don't think this is an improvement rather than a bug, as we don't 
have any guarantee to see client id in serdes before.
   
   There is another reason. We do pass generated client id to metric reporter.  
It seems to me all plugins should see consistent props.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…

2020-07-31 Thread GitBox


chia7712 commented on a change in pull request #9102:
URL: https://github.com/apache/kafka/pull/9102#discussion_r463930430



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -228,6 +228,13 @@ public Password getPassword(String key) {
 return copy;
 }
 
+public Map originals(Map configOverrides) {

Review comment:
   Will copy that





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

2020-07-31 Thread GitBox


dongjinleekr commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-667483663


   @tombentley Congratulations! :congratulations: @omkreddy @mimaison Thanks 
again for the detailed review, as usual! :smiley: 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org