[jira] [Updated] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes
[ https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anders Aagaard updated KAFKA-8201: -- Affects Version/s: 2.0.0 > Kafka streams repartitioning topic settings crashing multiple nodes > --- > > Key: KAFKA-8201 > URL: https://issues.apache.org/jira/browse/KAFKA-8201 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Anders Aagaard >Priority: Major > > We had an incident in a setup using kafka streams version 2.0.0 and kafka > version 2.0.0. The reason for it is a combination of kafka streams defaults > and a bug in kafka. > Info about the setup: Streams application reading a log compacted input > topic, and performing a groupby operation requiring repartitioning. > Kafka streams automatically creates a repartitioning topic with 24 partitions > and the following options: > segment.bytes=52428800, retention.ms=9223372036854775807, > segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60. > > This should mean we roll out a new segment when the active one reaches 50mb > or is older than 10 mniutes. However, the different timestamps coming into > the topic due to log compaction (sometimes varying in multiple days) means > the server will see a message which is older than segments.ms and > automatically trigger a new segment roll out. This causes a segment > explosion. Where new segments are continuously rolled out. > There seems to be a bug report for this server side here : > https://issues.apache.org/jira/browse/KAFKA-4336. > This effectively took down several nodes and a broker in our cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes
Anders Aagaard created KAFKA-8201: - Summary: Kafka streams repartitioning topic settings crashing multiple nodes Key: KAFKA-8201 URL: https://issues.apache.org/jira/browse/KAFKA-8201 Project: Kafka Issue Type: Bug Components: streams Reporter: Anders Aagaard We had an incident in a setup using kafka streams version 2.0.0 and kafka version 2.0.0. The reason for it is a combination of kafka streams defaults and a bug in kafka. Info about the setup: Streams application reading a log compacted input topic, and performing a groupby operation requiring repartitioning. Kafka streams automatically creates a repartitioning topic with 24 partitions and the following options: segment.bytes=52428800, retention.ms=9223372036854775807, segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60. This should mean we roll out a new segment when the active one reaches 50mb or is older than 10 mniutes. However, the different timestamps coming into the topic due to log compaction (sometimes varying in multiple days) means the server will see a message which is older than segments.ms and automatically trigger a new segment roll out. This causes a segment explosion. Where new segments are continuously rolled out. There seems to be a bug report for this server side here : https://issues.apache.org/jira/browse/KAFKA-4336. This effectively took down several nodes and a broker in our cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes
[ https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813069#comment-16813069 ] Anders Aagaard commented on KAFKA-8201: --- Another question here : Why use infinite retention and cleanup.policy delete instead of log compaction for this case? > Kafka streams repartitioning topic settings crashing multiple nodes > --- > > Key: KAFKA-8201 > URL: https://issues.apache.org/jira/browse/KAFKA-8201 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Anders Aagaard >Priority: Major > > We had an incident in a setup using kafka streams version 2.0.0 and kafka > version 2.0.0. The reason for it is a combination of kafka streams defaults > and a bug in kafka. > Info about the setup: Streams application reading a log compacted input > topic, and performing a groupby operation requiring repartitioning. > Kafka streams automatically creates a repartitioning topic with 24 partitions > and the following options: > segment.bytes=52428800, retention.ms=9223372036854775807, > segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60. > > This should mean we roll out a new segment when the active one reaches 50mb > or is older than 10 mniutes. However, the different timestamps coming into > the topic due to log compaction (sometimes varying in multiple days) means > the server will see a message which is older than segments.ms and > automatically trigger a new segment roll out. This causes a segment > explosion. Where new segments are continuously rolled out. > There seems to be a bug report for this server side here : > https://issues.apache.org/jira/browse/KAFKA-4336. > This effectively took down several nodes and a broker in our cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes
[ https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anders Aagaard updated KAFKA-8201: -- Description: We had an incident in a setup using kafka streams version 2.0.0 and kafka version 2.0.0 protocol version 2.0-IV1. The reason for it is a combination of kafka streams defaults and a bug in kafka. Info about the setup: Streams application reading a log compacted input topic, and performing a groupby operation requiring repartitioning. Kafka streams automatically creates a repartitioning topic with 24 partitions and the following options: segment.bytes=52428800, retention.ms=9223372036854775807, segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60. This should mean we roll out a new segment when the active one reaches 50mb or is older than 10 mniutes. However, the different timestamps coming into the topic due to log compaction (sometimes varying in multiple days) means the server will see a message which is older than segments.ms and automatically trigger a new segment roll out. This causes a segment explosion. Where new segments are continuously rolled out. There seems to be a bug report for this server side here : https://issues.apache.org/jira/browse/KAFKA-4336. This effectively took down several nodes and a broker in our cluster. was: We had an incident in a setup using kafka streams version 2.0.0 and kafka version 2.0.0. The reason for it is a combination of kafka streams defaults and a bug in kafka. Info about the setup: Streams application reading a log compacted input topic, and performing a groupby operation requiring repartitioning. Kafka streams automatically creates a repartitioning topic with 24 partitions and the following options: segment.bytes=52428800, retention.ms=9223372036854775807, segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60. This should mean we roll out a new segment when the active one reaches 50mb or is older than 10 mniutes. However, the different timestamps coming into the topic due to log compaction (sometimes varying in multiple days) means the server will see a message which is older than segments.ms and automatically trigger a new segment roll out. This causes a segment explosion. Where new segments are continuously rolled out. There seems to be a bug report for this server side here : https://issues.apache.org/jira/browse/KAFKA-4336. This effectively took down several nodes and a broker in our cluster. > Kafka streams repartitioning topic settings crashing multiple nodes > --- > > Key: KAFKA-8201 > URL: https://issues.apache.org/jira/browse/KAFKA-8201 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Anders Aagaard >Priority: Major > > We had an incident in a setup using kafka streams version 2.0.0 and kafka > version 2.0.0 protocol version 2.0-IV1. The reason for it is a combination of > kafka streams defaults and a bug in kafka. > Info about the setup: Streams application reading a log compacted input > topic, and performing a groupby operation requiring repartitioning. > Kafka streams automatically creates a repartitioning topic with 24 partitions > and the following options: > segment.bytes=52428800, retention.ms=9223372036854775807, > segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60. > > This should mean we roll out a new segment when the active one reaches 50mb > or is older than 10 mniutes. However, the different timestamps coming into > the topic due to log compaction (sometimes varying in multiple days) means > the server will see a message which is older than segments.ms and > automatically trigger a new segment roll out. This causes a segment > explosion. Where new segments are continuously rolled out. > There seems to be a bug report for this server side here : > https://issues.apache.org/jira/browse/KAFKA-4336. > This effectively took down several nodes and a broker in our cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow
[ https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813140#comment-16813140 ] Jonathan Santilli commented on KAFKA-7656: -- Hello [~jagsancio], this is the only log I see (on the leader): {noformat} [2019-04-08 22:35:27,145] ERROR [ReplicaManager broker=1] Error processing fetch with max size -2147483648 from consumer on partition __consumer_offsets-14: (fetchOffset=0, logStartOffset=-1, maxBytes=-2147483648, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read from segment FileRecords(file= /opt/kafka/logdata/__consumer_offsets-14/.log, start=0, end=2147483647) at kafka.log.LogSegment.read(LogSegment.scala:274) at kafka.log.Log.$anonfun$read$2(Log.scala:1245) at kafka.log.Log.maybeHandleIOException(Log.scala:2013) at kafka.log.Log.read(Log.scala:1200) at kafka.cluster.Partition.$anonfun$readRecords$1(Partition.scala:805) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.cluster.Partition.readRecords(Partition.scala:781) at kafka.server.ReplicaManager.read$1(ReplicaManager.scala:926) at kafka.server.ReplicaManager.$anonfun$readFromLocalLog$4(ReplicaManager.scala:991) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:990) at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:840) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:845) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:723) at kafka.server.KafkaApis.handle(KafkaApis.scala:109) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748) {noformat} Is the only partition with the problem so far. > ReplicaManager fetch fails on leader due to long/integer overflow > - > > Key: KAFKA-7656 > URL: https://issues.apache.org/jira/browse/KAFKA-7656 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.1 > Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 > EDT 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: Patrick Haas >Assignee: Jose Armando Garcia Sancio >Priority: Major > > (Note: From 2.0.1-cp1 from confluent distribution) > {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error > processing fetch operation on partition __consumer_offsets-20, offset 0 > (kafka.server.ReplicaManager)}} > {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log > read from segment FileRecords(file= > /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, > start=0, end=2147483647)}} > {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}} > {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}} > {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}} > {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}} > {{ at kafka.log.Log.read(Log.scala:1114)}} > {{ at > kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}} > {{ at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}} > {{ at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}} > {{ at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}} > {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}} > {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}} > {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}} > {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}} > {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}} > {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}} > {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}} > {{ at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow
[ https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813140#comment-16813140 ] Jonathan Santilli edited comment on KAFKA-7656 at 4/9/19 8:53 AM: -- Hello [~jagsancio], this is the only log I see (on the leader): {code:java} [2019-04-08 22:35:27,145] ERROR [ReplicaManager broker=1] Error processing fetch with max size -2147483648 from consumer on partition __consumer_offsets-14: (fetchOffset=0, logStartOffset=-1, maxBytes=-2147483648, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read from segment FileRecords(file= /opt/kafka/logdata/__consumer_offsets-14/.log, start=0, end=2147483647) at kafka.log.LogSegment.read(LogSegment.scala:274) at kafka.log.Log.$anonfun$read$2(Log.scala:1245) at kafka.log.Log.maybeHandleIOException(Log.scala:2013) at kafka.log.Log.read(Log.scala:1200) at kafka.cluster.Partition.$anonfun$readRecords$1(Partition.scala:805) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.cluster.Partition.readRecords(Partition.scala:781) at kafka.server.ReplicaManager.read$1(ReplicaManager.scala:926) at kafka.server.ReplicaManager.$anonfun$readFromLocalLog$4(ReplicaManager.scala:991) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:990) at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:840) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:845) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:723) at kafka.server.KafkaApis.handle(KafkaApis.scala:109) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748){code} Is the only partition with the problem so far. was (Author: pachilo): Hello [~jagsancio], this is the only log I see (on the leader): {noformat} [2019-04-08 22:35:27,145] ERROR [ReplicaManager broker=1] Error processing fetch with max size -2147483648 from consumer on partition __consumer_offsets-14: (fetchOffset=0, logStartOffset=-1, maxBytes=-2147483648, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read from segment FileRecords(file= /opt/kafka/logdata/__consumer_offsets-14/.log, start=0, end=2147483647) at kafka.log.LogSegment.read(LogSegment.scala:274) at kafka.log.Log.$anonfun$read$2(Log.scala:1245) at kafka.log.Log.maybeHandleIOException(Log.scala:2013) at kafka.log.Log.read(Log.scala:1200) at kafka.cluster.Partition.$anonfun$readRecords$1(Partition.scala:805) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.cluster.Partition.readRecords(Partition.scala:781) at kafka.server.ReplicaManager.read$1(ReplicaManager.scala:926) at kafka.server.ReplicaManager.$anonfun$readFromLocalLog$4(ReplicaManager.scala:991) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:990) at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:840) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:845) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:723) at kafka.server.KafkaApis.handle(KafkaApis.scala:109) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748) {noformat} Is the only partition with the problem so far. > ReplicaManager fetch fails on leader due to long/integer overflow > - > > Key: KAFKA-7656 > URL: https://issues.apache.org/jira/browse/KAFKA-7656 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.1 > Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 > EDT 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: Patrick Haas >Assignee: Jose Armando Garcia Sancio >Priority: Major > > (Note: From 2.0.1-cp1 from confluent distribution) > {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error > processing fetch operation on partition __consumer_offsets-20, offset 0 > (kafka.server.ReplicaManager)}} > {{java.lang.IllegalArgumentExceptio
[jira] [Created] (KAFKA-8202) StackOverflowError on producer when splitting batches
Daniel Krawczyk created KAFKA-8202: -- Summary: StackOverflowError on producer when splitting batches Key: KAFKA-8202 URL: https://issues.apache.org/jira/browse/KAFKA-8202 Project: Kafka Issue Type: Bug Affects Versions: 2.0.0 Reporter: Daniel Krawczyk Hello, recently we came across a StackOverflowError error in the Kafka producer java library. The error caused the Kafka producer to stop. The stack trace was as follows: {code:java} java.lang.StackOverflowError: null at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) // […] {code} The piece of code responsible for the error: {code:java} /** * This method is used when we have to split a large batch in smaller ones. A chained metadata will allow the * future that has already returned to the users to wait on the newly created split batches even after the * old big batch has been deemed as done. */ void chain(FutureRecordMetadata futureRecordMetadata) { if (nextRecordMetadata == null) nextRecordMetadata = futureRecordMetadata; else nextRecordMetadata.chain(futureRecordMetadata); } {code} Before the error occurred we observed large amount of logs related to record batches being split (caused by MESSAGE_TOO_LARGE error) on one of our topics (logged by org.apache.kafka.clients.producer.internals.Sender): {code:java} [Producer clientId=producer-1] Got error produce response in correlation id 158621342 on topic-partition , splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE {code} All logs had different correlation ids, but the same counters of attempts left (2147483647), so it looked like they were related to different requests and all of them were succeeding with no further retries. We are using kafka-clients java library in version 2.0.0, the brokers are 2.1.1. Thanks in advance. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8202) StackOverflowError on producer when splitting batches
[ https://issues.apache.org/jira/browse/KAFKA-8202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Krawczyk updated KAFKA-8202: --- Description: Hello, recently we came across a StackOverflowError error in the Kafka producer java library. The error caused the Kafka producer to stop (we had to restart our service due to: IllegalStateException: Cannot perform operation after producer has been closed). The stack trace was as follows: {code:java} java.lang.StackOverflowError: null at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) // […] {code} The piece of code responsible for the error: {code:java} /** * This method is used when we have to split a large batch in smaller ones. A chained metadata will allow the * future that has already returned to the users to wait on the newly created split batches even after the * old big batch has been deemed as done. */ void chain(FutureRecordMetadata futureRecordMetadata) { if (nextRecordMetadata == null) nextRecordMetadata = futureRecordMetadata; else nextRecordMetadata.chain(futureRecordMetadata); } {code} Before the error occurred we observed large amount of logs related to record batches being split (caused by MESSAGE_TOO_LARGE error) on one of our topics (logged by org.apache.kafka.clients.producer.internals.Sender): {code:java} [Producer clientId=producer-1] Got error produce response in correlation id 158621342 on topic-partition , splitting and retrying (2147483647 attempts left). Error: MESSAGE_TOO_LARGE {code} All logs had different correlation ids, but the same counters of attempts left (2147483647), so it looked like they were related to different requests and all of them were succeeding with no further retries. We are using kafka-clients java library in version 2.0.0, the brokers are 2.1.1. Thanks in advance. was: Hello, recently we came across a StackOverflowError error in the Kafka producer java library. The error caused the Kafka producer to stop. The stack trace was as follows: {code:java} java.lang.StackOverflowError: null at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.chain(FutureRecordMetadata.java:89) // […] {code} The piece of code responsible for the error: {code:java} /** * This method is used when we have to split a large batch in smaller ones. A chained metadata will allow the * future that has
[jira] [Commented] (KAFKA-6755) MaskField SMT should optionally take a literal value to use instead of using null
[ https://issues.apache.org/jira/browse/KAFKA-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813197#comment-16813197 ] Valeria Vasylieva commented on KAFKA-6755: -- [~rhauch] discussion did not gain lot of interest from the community, should I proceed to vote, so that we finally decide if this change should be merged or discarded? > MaskField SMT should optionally take a literal value to use instead of using > null > - > > Key: KAFKA-6755 > URL: https://issues.apache.org/jira/browse/KAFKA-6755 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Randall Hauch >Assignee: Valeria Vasylieva >Priority: Major > Labels: needs-kip, newbie > Original Estimate: 8h > Remaining Estimate: 8h > > The existing {{org.apache.kafka.connect.transforms.MaskField}} SMT always > uses the null value for the type of field. It'd be nice to *optionally* be > able to specify a literal value for the type, where the SMT would convert the > literal string value in the configuration to the desired type (using the new > {{Values}} methods). > Use cases: mask out the IP address, or SSN, or other personally identifiable > information (PII). > Since this changes the API, and thus will require a KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6675) Connect workers should log plugin path and available plugins more clearly
[ https://issues.apache.org/jira/browse/KAFKA-6675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813204#comment-16813204 ] Valeria Vasylieva commented on KAFKA-6675: -- [~rhauch] thank you for clarification, I got it and would proceed with the task. > Connect workers should log plugin path and available plugins more clearly > - > > Key: KAFKA-6675 > URL: https://issues.apache.org/jira/browse/KAFKA-6675 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.11.0.1 >Reporter: Randall Hauch >Assignee: Valeria Vasylieva >Priority: Minor > > Users struggle with setting the plugin path and properly installing plugins. > If users get any of this wrong, they get strange errors only after they run > the worker and attempt to deploy connectors or use transformations. > The Connect worker should more obviously output the plugin path directories > and the available plugins. For example, if the {{plugin.path}} were: > {code} > plugin.path=/usr/local/share/java,/usr/local/plugins > {code} > then the worker might output something like the following information in the > log: > {noformat} > Looking for plugins on classpath and inside plugin.path directories: > /usr/local/share/java > /usr/local/plugins > > Source Connector(s): > FileStreamSource (org.apache.kafka.connect.file.FileStreamSourceConnector) > @ classpath > FileStreamSink(org.apache.kafka.connect.file.FileStreamSinkConnector) > @ classpath > JdbcSource(io.confluent.connect.jdbc.JdbcSourceConnector) > @ /usr/local/share/java/kafka-connect-jdbc > MySql (io.debezium.connector.mysql.MySqlConnector) > @ /usr/local/plugins/debezium-connector-mysql > Converter(s): > JsonConverter (org.apache.kafka.connect.json.JsonConverter) > @ classpath > ByteArrayConverter > (org.apache.kafka.connect.converters.ByteArrayConverter)@ classpath > SimpleHeaderConverter > (org.apache.kafka.connect.converters.SimpleHeaderConverter) @ classpath > AvroConverter (io.confluent.connect.avro.AvroConverter) > @ /usr/local/share/java/kafka-serde-tools > Transformation(s): > InsertField (org.apache.kafka.connect.transforms.InsertField) > @ classpath > ReplaceField (org.apache.kafka.connect.transforms.ReplaceField) > @ classpath > MaskField (org.apache.kafka.connect.transforms.MaskField) > @ classpath > ValueToKey(org.apache.kafka.connect.transforms.ValueToKey) > @ classpath > HoistField(org.apache.kafka.connect.transforms.HoistField) > @ classpath > ExtractField (org.apache.kafka.connect.transforms.ExtractField) > @ classpath > SetSchemaMetadata (org.apache.kafka.connect.transforms.SetSchemaMetadata) > @ classpath > RegexRouter (org.apache.kafka.connect.transforms.RegexRouter) > @ classpath > TimestampRouter (org.apache.kafka.connect.transforms.TimestampRouter) > @ classpath > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow
[ https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813228#comment-16813228 ] Jonathan Santilli commented on KAFKA-7656: -- Enabling the TRACE level shows a lot, I can share a line before and after the error on the Leader Broker for partition *__consumer_offsets-14*: {code:java} [2019-04-09 11:05:26,877] TRACE [ReplicaManager broker=1] Fetching log segment for partition __consumer_offsets-14, offset 0, partition fetch size -2147483648, remaining response limit 2147483647 (kafka.server.ReplicaManager) [2019-04-09 11:05:26,877] ERROR [ReplicaManager broker=1] Error processing fetch with max size -2147483648 from consumer on partition __consumer_offsets-14: (fetchOffset=0, logStartOffset=-1, maxBytes=-2147483648, currentLeaderEpoch=Optional.empty) (kafka.server.ReplicaManager) java.lang.IllegalArgumentException: Invalid max size -2147483648 for log read from segment FileRecords(file= /opt/kafka/logdata/__consumer_offsets-14/.log, start=0, end=2147483647) at kafka.log.LogSegment.read(LogSegment.scala:274) at kafka.log.Log.$anonfun$read$2(Log.scala:1245) at kafka.log.Log.maybeHandleIOException(Log.scala:2013) at kafka.log.Log.read(Log.scala:1200) at kafka.cluster.Partition.$anonfun$readRecords$1(Partition.scala:805) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.cluster.Partition.readRecords(Partition.scala:781) at kafka.server.ReplicaManager.read$1(ReplicaManager.scala:926) at kafka.server.ReplicaManager.$anonfun$readFromLocalLog$4(ReplicaManager.scala:991) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:990) at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:840) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:845) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:723) at kafka.server.KafkaApis.handle(KafkaApis.scala:109) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748) [2019-04-09 11:05:26,879] TRACE [ReplicaManager broker=1] Fetching log segment for partition __consumer_offsets-26, offset 0, partition fetch size 1048576, remaining response limit 2147483647 (kafka.server.ReplicaManager){code} I see the logs says *offset 0*, but the last offset at the moment was *53,858,848.* Maybe is not related, but worth to mention. Now the leader is *broker=3*, and also there is showing the same Exception. > ReplicaManager fetch fails on leader due to long/integer overflow > - > > Key: KAFKA-7656 > URL: https://issues.apache.org/jira/browse/KAFKA-7656 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.1 > Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 > EDT 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: Patrick Haas >Assignee: Jose Armando Garcia Sancio >Priority: Major > > (Note: From 2.0.1-cp1 from confluent distribution) > {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error > processing fetch operation on partition __consumer_offsets-20, offset 0 > (kafka.server.ReplicaManager)}} > {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log > read from segment FileRecords(file= > /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, > start=0, end=2147483647)}} > {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}} > {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}} > {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}} > {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}} > {{ at kafka.log.Log.read(Log.scala:1114)}} > {{ at > kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}} > {{ at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}} > {{ at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}} > {{ at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}} > {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}} > {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}} > {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}} > {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}} > {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}} > {{ at ka
[jira] [Created] (KAFKA-8203) plaintext connections to SSL secured broker can be handled more elegantly
Jorg Heymans created KAFKA-8203: --- Summary: plaintext connections to SSL secured broker can be handled more elegantly Key: KAFKA-8203 URL: https://issues.apache.org/jira/browse/KAFKA-8203 Project: Kafka Issue Type: Improvement Affects Versions: 2.1.1 Reporter: Jorg Heymans Mailing list thread: [https://lists.apache.org/thread.html/39935157351c0ad590e6cf02027816d664f1fd3724a25c1133a3bba6@%3Cusers.kafka.apache.org%3E] -reproduced here We have our brokers secured with these standard properties {code:java} listeners=SSL://a.b.c:9030 ssl.truststore.location=... ssl.truststore.password=... ssl.keystore.location=... ssl.keystore.password=... ssl.key.password=... ssl.client.auth=required ssl.enabled.protocols=TLSv1.2 {code} It's a bit surprising to see that when a (java) client attempts to connect without having SSL configured, so doing a PLAINTEXT connection instead, it does not get a TLS exception indicating that SSL is required. Somehow i would have expected a hard transport-level exception making it clear that non-SSL connections are not allowed, instead the client sees this (when debug logging is enabled) {code:java} [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 21234bee31165527 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=my-test-group] Kafka consumer initialized [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-1, groupId=my-test-group] Subscribed to topic(s): events [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=my-test-group] Sending FindCoordinator request to broker a.b.c:9030 (id: -1 rack: null) [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=my-test-group] Initiating connection to node a.b.c:9030 (id: -1 rack: null) using address /a.b.c [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-1, groupId=my-test-group] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=my-test-group] Completed connection to node -1. Fetching API versions. [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=my-test-group] Initiating API versions fetch from node -1. [main] DEBUG org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-1, groupId=my-test-group] Connection with /a.b.c disconnected java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) at org.apache.kafka.common.network.Selector.poll(Selector.java:467) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) at eu.europa.ec.han.TestConsumer.main(TestConsumer.java:22) [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=my-test-group] Node -1 disconnected. [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient - [Consumer clientId=consumer-1, groupId=my-test-group] Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-1, correlationId=0) due to node -1 being disconnected [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer
[jira] [Updated] (KAFKA-8194) MessagesInPerSec incorrect value when when transactional messaging are enabled
[ https://issues.apache.org/jira/browse/KAFKA-8194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Odyldzhon Toshbekov updated KAFKA-8194: --- Summary: MessagesInPerSec incorrect value when when transactional messaging are enabled (was: MessagesInPerSec incorrect value when Stream produce messages) > MessagesInPerSec incorrect value when when transactional messaging are enabled > -- > > Key: KAFKA-8194 > URL: https://issues.apache.org/jira/browse/KAFKA-8194 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 1.1.0, 2.2.0 >Reporter: Odyldzhon Toshbekov >Priority: Trivial > Attachments: Screen Shot 2019-04-05 at 17.51.03.png, Screen Shot > 2019-04-05 at 17.52.22.png > > > Looks like metric > {code:java} > kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec{code} > has incorrect value when messages come via Kafka Stream API. > I noticed that offset for every message from Kafka Stream can be increased by > 1,2,... However if messages come to Broker from Kafka producer it's always > incremented by 1. > Unfortunately the metric mentioned above calculated based on offset changes > and as result we cannot use streams because metric will be always incorrect. > For Kafka 2.2.0 > !Screen Shot 2019-04-05 at 17.51.03.png|width=100%! > > [https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/server/ReplicaManager.scala] > And this is the method used to get "numAppendedMessages" > !Screen Shot 2019-04-05 at 17.52.22.png|width=100%! > https://github.com/apache/kafka/blob/2.2.0/core/src/main/scala/kafka/log/Log.scala -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813425#comment-16813425 ] Bill Bejeck commented on KAFKA-7965: Saw this failure again [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/20786/] {noformat} Error Message org.scalatest.junit.JUnitTestFailedError: Should have received an class org.apache.kafka.common.errors.GroupMaxSizeReachedException during the cluster roll Stacktrace org.scalatest.junit.JUnitTestFailedError: Should have received an class org.apache.kafka.common.errors.GroupMaxSizeReachedException during the cluster roll at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:100) at org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:71) at org.scalatest.Assertions$class.fail(Assertions.scala:1089) at org.scalatest.junit.JUnitSuite.fail(JUnitSuite.scala:71) at kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:344) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.ParentRunner.run(ParentRunner.java:412) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813550#comment-16813550 ] Dmitry Minkovsky commented on KAFKA-5998: - [~guozhang] I am using kafka_2.12-2.1.0. The behavior seems identical on my local dev environment macOS 10.14/APFS and whatever Linux GKE runs, with Kafka inside a container using data on ext4. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:32
[jira] [Commented] (KAFKA-8201) Kafka streams repartitioning topic settings crashing multiple nodes
[ https://issues.apache.org/jira/browse/KAFKA-8201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813573#comment-16813573 ] Bill Bejeck commented on KAFKA-8201: Hi Anders Aagaard, I'm not sure this is a bug for Kafka Streams since this is known behavior (as you referenced in the above Jira) and is controlled by the broker. However, there is a workaround you can do. Kafka Streams users have control over the settings for repartition (internal) topic. When setting up your application you can adjust any of the settings you've listed above by using the StreamsConfig.topicPrefix method along with the relevant topic configuration value. For example: {noformat} final Properties props = new Properties(); props.put(StreamsConfig.topicPrefix(TopicConfig.CLEANUP_POLICY_CONFIG), TopicConfig.CLEANUP_POLICY_COMPACT); props.put(StreamsConfig.topicPrefix(TopicConfig.RETENTION_MS_CONFIG), "XX"); props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_MS_CONFIG), "XXX");{noformat} Just note that any settings set this way will apply to all Kafka Streams internal topics. HTH, Bill > Kafka streams repartitioning topic settings crashing multiple nodes > --- > > Key: KAFKA-8201 > URL: https://issues.apache.org/jira/browse/KAFKA-8201 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Anders Aagaard >Priority: Major > > We had an incident in a setup using kafka streams version 2.0.0 and kafka > version 2.0.0 protocol version 2.0-IV1. The reason for it is a combination of > kafka streams defaults and a bug in kafka. > Info about the setup: Streams application reading a log compacted input > topic, and performing a groupby operation requiring repartitioning. > Kafka streams automatically creates a repartitioning topic with 24 partitions > and the following options: > segment.bytes=52428800, retention.ms=9223372036854775807, > segment.index.bytes=52428800, cleanup.policy=delete, segment.ms=60. > > This should mean we roll out a new segment when the active one reaches 50mb > or is older than 10 mniutes. However, the different timestamps coming into > the topic due to log compaction (sometimes varying in multiple days) means > the server will see a message which is older than segments.ms and > automatically trigger a new segment roll out. This causes a segment > explosion. Where new segments are continuously rolled out. > There seems to be a bug report for this server side here : > https://issues.apache.org/jira/browse/KAFKA-4336. > This effectively took down several nodes and a broker in our cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8203) plaintext connections to SSL secured broker can be handled more elegantly
[ https://issues.apache.org/jira/browse/KAFKA-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani reassigned KAFKA-8203: - Assignee: Satish Duggana > plaintext connections to SSL secured broker can be handled more elegantly > - > > Key: KAFKA-8203 > URL: https://issues.apache.org/jira/browse/KAFKA-8203 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.1.1 >Reporter: Jorg Heymans >Assignee: Satish Duggana >Priority: Major > > Mailing list thread: > [https://lists.apache.org/thread.html/39935157351c0ad590e6cf02027816d664f1fd3724a25c1133a3bba6@%3Cusers.kafka.apache.org%3E] > -reproduced here > We have our brokers secured with these standard properties > > {code:java} > listeners=SSL://a.b.c:9030 > ssl.truststore.location=... > ssl.truststore.password=... > ssl.keystore.location=... > ssl.keystore.password=... > ssl.key.password=... > ssl.client.auth=required > ssl.enabled.protocols=TLSv1.2 {code} > It's a bit surprising to see that when a (java) client attempts to connect > without having SSL configured, so doing a PLAINTEXT connection instead, it > does not get a TLS exception indicating that SSL is required. Somehow i would > have expected a hard transport-level exception making it clear that non-SSL > connections are not allowed, instead the client sees this (when debug logging > is enabled) > {code:java} > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : > 21234bee31165527 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer > - [Consumer clientId=consumer-1, groupId=my-test-group] Kafka consumer > initialized [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - > [Consumer clientId=consumer-1, groupId=my-test-group] Subscribed to topic(s): > events [main] DEBUG > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer > clientId=consumer-1, groupId=my-test-group] Sending FindCoordinator request > to broker a.b.c:9030 (id: -1 rack: null) [main] DEBUG > org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, > groupId=my-test-group] Initiating connection to node a.b.c:9030 (id: -1 rack: > null) using address /a.b.c [main] DEBUG > org.apache.kafka.common.metrics.Metrics - Added sensor with name > node--1.bytes-sent [main] DEBUG org.apache.kafka.common.metrics.Metrics - > Added sensor with name node--1.bytes-received [main] DEBUG > org.apache.kafka.common.metrics.Metrics - Added sensor with name > node--1.latency [main] DEBUG org.apache.kafka.common.network.Selector - > [Consumer clientId=consumer-1, groupId=my-test-group] Created socket with > SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1 [main] DEBUG > org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, > groupId=my-test-group] Completed connection to node -1. Fetching API > versions. [main] DEBUG org.apache.kafka.clients.NetworkClient - [Consumer > clientId=consumer-1, groupId=my-test-group] Initiating API versions fetch > from node -1. [main] DEBUG org.apache.kafka.common.network.Selector - > [Consumer clientId=consumer-1, groupId=my-test-group] Connection with /a.b.c > disconnected java.io.EOFException at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381) > at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342) > at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609) at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541) > at org.apache.kafka.common.network.Selector.poll(Selector.java:467) at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164) > at eu.europa.ec.han.TestConsumer.main(TestConsumer.java:22) [main] DEBUG > org.apache.kafka.clients.NetworkClient - [Consumer
[jira] [Commented] (KAFKA-5061) client.id should be set for Connect producers/consumers
[ https://issues.apache.org/jira/browse/KAFKA-5061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813603#comment-16813603 ] Paul Davidson commented on KAFKA-5061: -- Voting is now underway on [KIP-411|https://cwiki.apache.org/confluence/display/KAFKA/KIP-411%3A+Make+default+Kafka+Connect+worker+task+client+IDs+distinct]. Please vote if you would like this bug resolved! > client.id should be set for Connect producers/consumers > --- > > Key: KAFKA-5061 > URL: https://issues.apache.org/jira/browse/KAFKA-5061 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.1 >Reporter: Ewen Cheslack-Postava >Priority: Major > Labels: needs-kip, newbie++ > > In order to properly monitor individual tasks using the producer and consumer > metrics, we need to have the framework disambiguate them. Currently when we > create producers > (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L362) > and create consumers > (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L371-L394) > the client ID is not being set. You can override it for the entire worker > via worker-level producer/consumer overrides, but you can't get per-task > metrics. > There are a couple of things we might want to consider doing here: > 1. Provide default client IDs based on the worker group ID + task ID > (providing uniqueness for multiple connect clusters up to the scope of the > Kafka cluster they are operating on). This seems ideal since it's a good > default; however it is a public-facing change and may need a KIP. Normally I > would be less worried about this, but some folks may be relying on picking up > metrics without this being set, in which case such a change would break their > monitoring. > 2. Allow overriding client.id on a per-connector basis. I'm not sure if this > will really be useful or not -- it lets you differentiate between metrics for > different connectors' tasks, but within a connector, all metrics would go to > a single client.id. On the other hand, this makes the tasks act as a single > group from the perspective of broker handling of client IDs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8204) Streams may flush state stores in the incorrect order
John Roesler created KAFKA-8204: --- Summary: Streams may flush state stores in the incorrect order Key: KAFKA-8204 URL: https://issues.apache.org/jira/browse/KAFKA-8204 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Assignee: John Roesler Cached state stores may forward records during a flush call, so Streams should flush the stores in topological order. Otherwise, Streams may flush a downstream store before an upstream one, resulting in sink results being committed without the corresponding state changelog updates being committed. This behavior is partly responsible for the bug reported in KAFKA-7895 . The fix is simply to flush the stores in topological order, then when the upstream store forwards records to a downstream stateful processor, the corresponding state changes will be correctly flushed as well. An alternative would be to repeatedly call flush on all state stores until they report there is nothing left to flush, but this requires a public API change to enable state stores to report whether they need a flush or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813626#comment-16813626 ] John Roesler commented on KAFKA-7895: - I'm tracking the incorrect flushing behavior in a separate ticket. This is the root cause of the continued duplicate results, even when EOS is enabled, when the Streams process undergoes an abrubt stop. > Ktable supress operator emitting more than one record for the same key per > window > - > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.1.1 >Reporter: prasanthi >Assignee: John Roesler >Priority: Major > Fix For: 2.2.0, 2.1.2 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039 > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162 > [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584 > [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107 > [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315 > [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813626#comment-16813626 ] John Roesler edited comment on KAFKA-7895 at 4/9/19 5:08 PM: - I'm tracking the incorrect flushing behavior in a separate ticket (KAFKA-8204). This is the root cause of the continued duplicate results, even when EOS is enabled, when the Streams process undergoes an abrubt stop. was (Author: vvcephei): I'm tracking the incorrect flushing behavior in a separate ticket. This is the root cause of the continued duplicate results, even when EOS is enabled, when the Streams process undergoes an abrubt stop. > Ktable supress operator emitting more than one record for the same key per > window > - > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.1.1 >Reporter: prasanthi >Assignee: John Roesler >Priority: Major > Fix For: 2.2.0, 2.1.2 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039 > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162 > [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584 > [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107 > [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315 > [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8205) Kafka SSL encryption of dataat rest
Niten Aggarwal created KAFKA-8205: - Summary: Kafka SSL encryption of dataat rest Key: KAFKA-8205 URL: https://issues.apache.org/jira/browse/KAFKA-8205 Project: Kafka Issue Type: Bug Components: security Affects Versions: 1.0.1 Environment: All Reporter: Niten Aggarwal Recently we enabled SSL on our kafka cluster which earlier had SASL PLAINTEXT. Everything works fine from both producer and consumer standpoint as expected with one strange behavior. We noticed data in the log file is also encrypted which we didn't thought of because SSL is meant for transport level security not to encrypt data at rest. It doesn't mean we have any issues with that but would like to understand what enables to perform encrypting data at rest. Do we have a way to:- 1) turn it off 2) Extend the encryption algorithm if company would like to use their own key management system and different algorithm. After going through Kafka docs, we realized there is a KIP already in discussion but how come it's implemented without been approved? [https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8205) Kafka SSL encryption of data at rest
[ https://issues.apache.org/jira/browse/KAFKA-8205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Niten Aggarwal updated KAFKA-8205: -- Summary: Kafka SSL encryption of data at rest (was: Kafka SSL encryption of dataat rest) > Kafka SSL encryption of data at rest > > > Key: KAFKA-8205 > URL: https://issues.apache.org/jira/browse/KAFKA-8205 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 1.0.1 > Environment: All >Reporter: Niten Aggarwal >Priority: Major > > Recently we enabled SSL on our kafka cluster which earlier had SASL > PLAINTEXT. Everything works fine from both producer and consumer standpoint > as expected with one strange behavior. We noticed data in the log file is > also encrypted which we didn't thought of because SSL is meant for transport > level security not to encrypt data at rest. > It doesn't mean we have any issues with that but would like to understand > what enables to perform encrypting data at rest. Do we have a way to:- > 1) turn it off > 2) Extend the encryption algorithm if company would like to use their own key > management system and different algorithm. > After going through Kafka docs, we realized there is a KIP already in > discussion but how come it's implemented without been approved? > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow
[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow
[ https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813657#comment-16813657 ] Jonathan Santilli commented on KAFKA-7656: -- Currently 2.2, we have updated from 2.0 recently. This error started showing up since we update to 2.2 version. > ReplicaManager fetch fails on leader due to long/integer overflow > - > > Key: KAFKA-7656 > URL: https://issues.apache.org/jira/browse/KAFKA-7656 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.1 > Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 > EDT 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: Patrick Haas >Assignee: Jose Armando Garcia Sancio >Priority: Major > > (Note: From 2.0.1-cp1 from confluent distribution) > {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error > processing fetch operation on partition __consumer_offsets-20, offset 0 > (kafka.server.ReplicaManager)}} > {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log > read from segment FileRecords(file= > /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, > start=0, end=2147483647)}} > {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}} > {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}} > {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}} > {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}} > {{ at kafka.log.Log.read(Log.scala:1114)}} > {{ at > kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}} > {{ at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}} > {{ at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}} > {{ at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}} > {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}} > {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}} > {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}} > {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}} > {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}} > {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}} > {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}} > {{ at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8205) Kafka SSL encryption of data at rest
[ https://issues.apache.org/jira/browse/KAFKA-8205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813659#comment-16813659 ] Sriharsha Chintalapani commented on KAFKA-8205: --- [~nitena2019] This is question should be in the mailing list rather than opening a JIRA. Kafka doesn't have data at rest encryption yet. Kafka Ssl provides wire encryption only What you mean by your data in logs are encrypted? is it possible that what you are seeing is serialized data from Producers? And make sure you don't have disk encryption from OS or other third-party turned on > Kafka SSL encryption of data at rest > > > Key: KAFKA-8205 > URL: https://issues.apache.org/jira/browse/KAFKA-8205 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 1.0.1 > Environment: All >Reporter: Niten Aggarwal >Priority: Major > > Recently we enabled SSL on our kafka cluster which earlier had SASL > PLAINTEXT. Everything works fine from both producer and consumer standpoint > as expected with one strange behavior. We noticed data in the log file is > also encrypted which we didn't thought of because SSL is meant for transport > level security not to encrypt data at rest. > It doesn't mean we have any issues with that but would like to understand > what enables to perform encrypting data at rest. Do we have a way to:- > 1) turn it off > 2) Extend the encryption algorithm if company would like to use their own key > management system and different algorithm. > After going through Kafka docs, we realized there is a KIP already in > discussion but how come it's implemented without been approved? > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow
[ https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813657#comment-16813657 ] Jonathan Santilli edited comment on KAFKA-7656 at 4/9/19 5:38 PM: -- Currently 2.2, we have updated from 2.0 recently. This error started showing up since we update to 2.2 version. [~jagsancio] was (Author: pachilo): Currently 2.2, we have updated from 2.0 recently. This error started showing up since we update to 2.2 version. > ReplicaManager fetch fails on leader due to long/integer overflow > - > > Key: KAFKA-7656 > URL: https://issues.apache.org/jira/browse/KAFKA-7656 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.0.1 > Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 > EDT 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: Patrick Haas >Assignee: Jose Armando Garcia Sancio >Priority: Major > > (Note: From 2.0.1-cp1 from confluent distribution) > {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error > processing fetch operation on partition __consumer_offsets-20, offset 0 > (kafka.server.ReplicaManager)}} > {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log > read from segment FileRecords(file= > /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, > start=0, end=2147483647)}} > {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}} > {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}} > {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}} > {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}} > {{ at kafka.log.Log.read(Log.scala:1114)}} > {{ at > kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}} > {{ at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}} > {{ at > kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}} > {{ at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}} > {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}} > {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}} > {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}} > {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}} > {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}} > {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}} > {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}} > {{ at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8205) Kafka SSL encryption of data at rest
[ https://issues.apache.org/jira/browse/KAFKA-8205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813672#comment-16813672 ] Niten Aggarwal commented on KAFKA-8205: --- Hi [~sriharsha], Sure next time i will submit question in mailing list.. We are using String serializers. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Not sure about disk encryption but if i use same cluster with non ssl security protocol, we don't see data encrypted in log files. Am i missing some configuration? > Kafka SSL encryption of data at rest > > > Key: KAFKA-8205 > URL: https://issues.apache.org/jira/browse/KAFKA-8205 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 1.0.1 > Environment: All >Reporter: Niten Aggarwal >Priority: Major > > Recently we enabled SSL on our kafka cluster which earlier had SASL > PLAINTEXT. Everything works fine from both producer and consumer standpoint > as expected with one strange behavior. We noticed data in the log file is > also encrypted which we didn't thought of because SSL is meant for transport > level security not to encrypt data at rest. > It doesn't mean we have any issues with that but would like to understand > what enables to perform encrypting data at rest. Do we have a way to:- > 1) turn it off > 2) Extend the encryption algorithm if company would like to use their own key > management system and different algorithm. > After going through Kafka docs, we realized there is a KIP already in > discussion but how come it's implemented without been approved? > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted
alex gabriel created KAFKA-8206: --- Summary: A consumer can't discover new group coordinator when the cluster was partly restarted Key: KAFKA-8206 URL: https://issues.apache.org/jira/browse/KAFKA-8206 Project: Kafka Issue Type: Bug Affects Versions: 2.2.0, 2.0.0, 1.0.0 Reporter: alex gabriel *A consumer can't discover new group coordinator when the cluster was partly restarted* Preconditions: I use Kafka server and Java kafka-client lib 2.2 version I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 ZK(localhost:2181/localhost:2181) I have replication factor 2 for the all my topics and '_unclean.leader.election.enable=true_' on both Kafka nodes. Steps to reproduce: 1) Start 2nodes (localhost:9092/localhost:9093) 2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093' {noformat} // discovered group coordinator (0-node) 2019-04-09 16:23:18,963 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)> ...metadatacache is updated (2 nodes in the cluster list) 2019-04-09 16:23:18,928 DEBUG [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null)> 2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), localhost:9093 (id: 1 rack: null)], partitions = [], controller = localhost:9092 (id: 0 rack: null))}> {noformat} 3) Shutdown 1-node (localhost:9093) {noformat} // metadata was updated to the 4 version (but for some reasons it still had 2 alive nodes inside cluster) 2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = localhost:9092 (id: 0 rack: null))}> //consumers thinks that node-1 is still alive and try to send coordinator lookup to it but failed 2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)> 2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.> 2019-04-09 16:24:01,117 WARN [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 (localhost:9093) could not be established. Broker may not be available.> // refreshing metadata again 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 being disconnected> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Coordinator discovery failed, refreshing metadata> // metadata was updated to the 5 version where cluster had only 0-node localhost:9092 as expected. 2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1
[jira] [Updated] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted
[ https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] alex gabriel updated KAFKA-8206: Description: *A consumer can't discover new group coordinator when the cluster was partly restarted* Preconditions: I use Kafka server and Java kafka-client lib 2.2 version I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 ZK(localhost:2181) I have replication factor 2 for the all my topics and '_unclean.leader.election.enable=true_' on both Kafka nodes. Steps to reproduce: 1) Start 2nodes (localhost:9092/localhost:9093) 2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093' {noformat} // discovered group coordinator (0-node) 2019-04-09 16:23:18,963 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)> ...metadatacache is updated (2 nodes in the cluster list) 2019-04-09 16:23:18,928 DEBUG [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null)> 2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), localhost:9093 (id: 1 rack: null)], partitions = [], controller = localhost:9092 (id: 0 rack: null))}> {noformat} 3) Shutdown 1-node (localhost:9093) {noformat} // metadata was updated to the 4 version (but for some reasons it still had 2 alive nodes inside cluster) 2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = localhost:9092 (id: 0 rack: null))}> //consumers thinks that node-1 is still alive and try to send coordinator lookup to it but failed 2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)> 2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.> 2019-04-09 16:24:01,117 WARN [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 (localhost:9093) could not be established. Broker may not be available.> // refreshing metadata again 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 being disconnected> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Coordinator discovery failed, refreshing metadata> // metadata was updated to the 5 version where cluster had only 0-node localhost:9092 as expected. 2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1])], controller = localhost:9092 (id: 0 rack: null))}> // 0-node discovered as coordinator 2019-04-09 16:24:01,132 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.
[jira] [Updated] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted
[ https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] alex gabriel updated KAFKA-8206: Description: *A consumer can't discover new group coordinator when the cluster was partly restarted* Preconditions: I use Kafka server and Java kafka-client lib 2.2 version I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 ZK(localhost:2181) I have replication factor 2 for the all my topics and '_unclean.leader.election.enable=true_' on both Kafka nodes. Steps to reproduce: 1) Start 2nodes (localhost:9092/localhost:9093) 2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093' {noformat} // discovered group coordinator (0-node) 2019-04-09 16:23:18,963 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)> ...metadatacache is updated (2 nodes in the cluster list) 2019-04-09 16:23:18,928 DEBUG [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null)> 2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), localhost:9093 (id: 1 rack: null)], partitions = [], controller = localhost:9092 (id: 0 rack: null))}> {noformat} 3) Shutdown 1-node (localhost:9093) {noformat} // metadata was updated to the 4 version (but for some reasons it still had 2 alive nodes inside cluster) 2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = localhost:9092 (id: 0 rack: null))}> //consumers thinks that node-1 is still alive and try to send coordinator lookup to it but failed 2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)> 2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.> 2019-04-09 16:24:01,117 WARN [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 (localhost:9093) could not be established. Broker may not be available.> // refreshing metadata again 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 being disconnected> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Coordinator discovery failed, refreshing metadata> // metadata was updated to the 5 version where cluster had only 0-node localhost:9092 as expected. 2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1])], controller = localhost:9092 (id: 0 rack: null))}> // 0-node discovered as coordinator 2019-04-09 16:24:01,132 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.
[jira] [Updated] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted
[ https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] alex gabriel updated KAFKA-8206: Description: *A consumer can't discover new group coordinator when the cluster was partly restarted* Preconditions: I use Kafka server and Java kafka-client lib 2.2 version I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 ZK(localhost:2181) I have replication factor 2 for the all my topics and '_unclean.leader.election.enable=true_' on both Kafka nodes. Steps to reproduce: 1) Start 2nodes (localhost:9092/localhost:9093) 2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093' {noformat} // discovered group coordinator (0-node) 2019-04-09 16:23:18,963 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)> ...metadatacache is updated (2 nodes in the cluster list) 2019-04-09 16:23:18,928 DEBUG [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null)> 2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), localhost:9093 (id: 1 rack: null)], partitions = [], controller = localhost:9092 (id: 0 rack: null))}> {noformat} 3) Shutdown 1-node (localhost:9093) {noformat} // metadata was updated to the 4 version (but for some reasons it still had 2 alive nodes inside cluster) 2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = localhost:9092 (id: 0 rack: null))}> //consumers thinks that node-1 is still alive and try to send coordinator lookup to it but failed 2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)> 2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.> 2019-04-09 16:24:01,117 WARN [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 (localhost:9093) could not be established. Broker may not be available.> // refreshing metadata again 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 being disconnected> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Coordinator discovery failed, refreshing metadata> // metadata was updated to the 5 version where cluster had only 0-node localhost:9092 as expected. 2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1])], controller = localhost:9092 (id: 0 rack: null))}> // 0-node discovered as coordinator 2019-04-09 16:24:01,132 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.
[jira] [Commented] (KAFKA-8204) Streams may flush state stores in the incorrect order
[ https://issues.apache.org/jira/browse/KAFKA-8204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813748#comment-16813748 ] ASF GitHub Bot commented on KAFKA-8204: --- vvcephei commented on pull request #6555: KAFKA-8204: fix Streams store flush order URL: https://github.com/apache/kafka/pull/6555 Streams previously flushed stores in the order of their registration, which is arbitrary. Because stores may forward values upon flush (as in cached state stores), we must flush stores in topological order. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streams may flush state stores in the incorrect order > - > > Key: KAFKA-8204 > URL: https://issues.apache.org/jira/browse/KAFKA-8204 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > > Cached state stores may forward records during a flush call, so Streams > should flush the stores in topological order. Otherwise, Streams may flush a > downstream store before an upstream one, resulting in sink results being > committed without the corresponding state changelog updates being committed. > This behavior is partly responsible for the bug reported in KAFKA-7895 . > The fix is simply to flush the stores in topological order, then when the > upstream store forwards records to a downstream stateful processor, the > corresponding state changes will be correctly flushed as well. > An alternative would be to repeatedly call flush on all state stores until > they report there is nothing left to flush, but this requires a public API > change to enable state stores to report whether they need a flush or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8200) TopologyTestDriver should offer an iterable signature of readOutput
[ https://issues.apache.org/jira/browse/KAFKA-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813847#comment-16813847 ] ASF GitHub Bot commented on KAFKA-8200: --- pkleindl commented on pull request #6556: KAFKA-8200: added Iterator methods for output to TopologyTestDriver URL: https://github.com/apache/kafka/pull/6556 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TopologyTestDriver should offer an iterable signature of readOutput > --- > > Key: KAFKA-8200 > URL: https://issues.apache.org/jira/browse/KAFKA-8200 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Drogalis >Priority: Minor > Labels: needs-kip > > When using the TopologyTestDriver, one examines the output on a topic with > the readOutput method. This method returns one record at a time, until no > more records can be found, at which point in returns null. > Many times, the usage pattern around readOutput will involve writing a loop > to extract a number of records from the topic, building up a list of records, > until it returns null. > It would be helpful to offer an iterable signature of readOutput, which > returns either an iterator or list over the records that are currently > available in the topic. This would effectively remove the loop that a user > needs to write for him/herself each time. > Such a signature might look like: > {code:java} > public Iterable> readOutput(java.lang.String > topic); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8200) TopologyTestDriver should offer an iterable signature of readOutput
[ https://issues.apache.org/jira/browse/KAFKA-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813848#comment-16813848 ] Patrik Kleindl commented on KAFKA-8200: --- [~mjsax] Does this require a KIP? I took the liberty to try this out, seems to work fine, feedback on the PR is welcome. > TopologyTestDriver should offer an iterable signature of readOutput > --- > > Key: KAFKA-8200 > URL: https://issues.apache.org/jira/browse/KAFKA-8200 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Drogalis >Priority: Minor > Labels: needs-kip > > When using the TopologyTestDriver, one examines the output on a topic with > the readOutput method. This method returns one record at a time, until no > more records can be found, at which point in returns null. > Many times, the usage pattern around readOutput will involve writing a loop > to extract a number of records from the topic, building up a list of records, > until it returns null. > It would be helpful to offer an iterable signature of readOutput, which > returns either an iterator or list over the records that are currently > available in the topic. This would effectively remove the loop that a user > needs to write for him/herself each time. > Such a signature might look like: > {code:java} > public Iterable> readOutput(java.lang.String > topic); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7897) Invalid use of epoch cache with old message format versions
[ https://issues.apache.org/jira/browse/KAFKA-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-7897: --- Fix Version/s: 1.1.2 > Invalid use of epoch cache with old message format versions > --- > > Key: KAFKA-7897 > URL: https://issues.apache.org/jira/browse/KAFKA-7897 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.1, 2.1.0 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2 > > > Message format downgrades are not supported, but they generally work as long > as broker/clients at least can continue to parse both message formats. After > a downgrade, the truncation logic should revert to using the high watermark, > but currently we use the existence of any cached epoch as the sole > prerequisite in order to leverage OffsetsForLeaderEpoch. This has the effect > of causing a massive truncation after startup which causes re-replication. > I think our options to fix this are to either 1) clear the cache when we > notice a downgrade, or 2) forbid downgrades and raise an error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7093) Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0
[ https://issues.apache.org/jira/browse/KAFKA-7093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813937#comment-16813937 ] Gene Yi commented on KAFKA-7093: seems empty file leader-epoch-checkpoint for each topic will resolve this > Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0 > > > Key: KAFKA-7093 > URL: https://issues.apache.org/jira/browse/KAFKA-7093 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.1.0 >Reporter: Suleyman >Priority: Major > > I upgraded to kafka version from 0.11.0.1 to 1.1.0. After the upgrade, I'm > getting the below warn message too much. > WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. > This implies messages have arrived out of order. New: \{epoch:0, > offset:793868383}, Current: \{epoch:4, offset:792201264} for Partition: > __consumer_offsets-42 (kafka.server.epoch.LeaderEpochFileCache) > How can I resolve this warn messages? And why I'm getting this warn messages? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7093) Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0
[ https://issues.apache.org/jira/browse/KAFKA-7093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813971#comment-16813971 ] Gene Yi commented on KAFKA-7093: it's indeed the bug mentioned by [~gwenshap], https://issues.apache.org/jira/browse/KAFKA-7415, after we upgrade to 2.0.1 we can see leader-epoch-checkpoint for each topic updated to the correct values( sync with the epoch number from zookeeper). > Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0 > > > Key: KAFKA-7093 > URL: https://issues.apache.org/jira/browse/KAFKA-7093 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.1.0 >Reporter: Suleyman >Priority: Major > > I upgraded to kafka version from 0.11.0.1 to 1.1.0. After the upgrade, I'm > getting the below warn message too much. > WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. > This implies messages have arrived out of order. New: \{epoch:0, > offset:793868383}, Current: \{epoch:4, offset:792201264} for Partition: > __consumer_offsets-42 (kafka.server.epoch.LeaderEpochFileCache) > How can I resolve this warn messages? And why I'm getting this warn messages? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically
[ https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813990#comment-16813990 ] Dhruvil Shah commented on KAFKA-7362: - [~xiongqiwu] from my understanding, we could only have orphan partitions when an offline broker comes back online, so doing this cleanup once on startup should be sufficient. We could talk more about the implementation when you open the PR for review. Thank you for working on this! > enable kafka broker to remove orphan partitions automatically > -- > > Key: KAFKA-7362 > URL: https://issues.apache.org/jira/browse/KAFKA-7362 > Project: Kafka > Issue Type: Improvement > Components: core, log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > When partition reassignment removes topic partitions from a offline broker, > those removed partitions become orphan partitions to the broker. When the > offline broker comes back online, it is not able to clean up both data and > folders that belong to orphan partitions. Log manager will scan all all dirs > during startup, but the time based retention policy on a topic partition will > not be kicked out until the broker is either a follower or a leader of the > partition. In addition, we do not have logic to delete folders that belong > to orphan partition today. > Open this ticket to provide a mechanism (when enabled) to safely remove > orphan partitions automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically
[ https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813990#comment-16813990 ] Dhruvil Shah edited comment on KAFKA-7362 at 4/10/19 2:12 AM: -- [~xiongqiwu] thank you for working on this! I agree that we need to figure out the appropriate time to initiate cleanup of orphaned partitions. Perhaps we could discuss more about the implementation after you open the PR for review. was (Author: dhruvilshah): [~xiongqiwu] from my understanding, we could only have orphan partitions when an offline broker comes back online, so doing this cleanup once on startup should be sufficient. We could talk more about the implementation when you open the PR for review. Thank you for working on this! > enable kafka broker to remove orphan partitions automatically > -- > > Key: KAFKA-7362 > URL: https://issues.apache.org/jira/browse/KAFKA-7362 > Project: Kafka > Issue Type: Improvement > Components: core, log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > When partition reassignment removes topic partitions from a offline broker, > those removed partitions become orphan partitions to the broker. When the > offline broker comes back online, it is not able to clean up both data and > folders that belong to orphan partitions. Log manager will scan all all dirs > during startup, but the time based retention policy on a topic partition will > not be kicked out until the broker is either a follower or a leader of the > partition. In addition, we do not have logic to delete folders that belong > to orphan partition today. > Open this ticket to provide a mechanism (when enabled) to safely remove > orphan partitions automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16814015#comment-16814015 ] ASF GitHub Bot commented on KAFKA-7965: --- huxihx commented on pull request #6557: KAFKA-7965: Fix testRollingBrokerRestartsWithSmallerMaxGroup failure URL: https://github.com/apache/kafka/pull/6557 https://issues.apache.org/jira/browse/KAFKA-7965 Most of the time, the group coordinator runs on broker 1. Occasionally the group coordinator will be placed on broker 2. If that's the case, the loop starting at line 320 have no chance to check and update `kickedOutConsumerIdx`. A quick fix is to safely do another round of loop to ensure `kickedOutConsumerIdx` always be checked after the last broker restart. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: huxihx >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4453) add request prioritization
[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16814078#comment-16814078 ] Di Shang commented on KAFKA-4453: - Hi The new metric introduced here breaks our metric parser with NaN value "kafka.network,SocketServer,ControlPlaneNetworkProcessorAvgIdlePercent": "NaN", [https://github.com/apache/kafka/blob/2.2/core/src/main/scala/kafka/network/SocketServer.scala#L143] Is this the best default value for this metric? Can we use a concrete number instead? > add request prioritization > -- > > Key: KAFKA-4453 > URL: https://issues.apache.org/jira/browse/KAFKA-4453 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Onur Karaman >Assignee: Mayuresh Gharat >Priority: Major > Labels: kip > Fix For: 2.2.0 > > > Today all requests (client requests, broker requests, controller requests) to > a broker are put into the same queue. They all have the same priority. So a > backlog of requests ahead of the controller request will delay the processing > of controller requests. This causes requests infront of the controller > request to get processed based on stale state. > Side effects may include giving clients stale metadata[1], rejecting > ProduceRequests and FetchRequests[2], and data loss (for some unofficial[3] > definition of data loss in terms of messages beyond the high watermark)[4]. > We'd like to minimize the number of requests processed based on stale state. > With request prioritization, controller requests get processed before regular > queued up requests, so requests can get processed with up-to-date state. > [1] Say a client's MetadataRequest is sitting infront of a controller's > UpdateMetadataRequest on a given broker's request queue. Suppose the > MetadataRequest is for a topic whose partitions have recently undergone > leadership changes and that these leadership changes are being broadcasted > from the controller in the later UpdateMetadataRequest. Today the broker > processes the MetadataRequest before processing the UpdateMetadataRequest, > meaning the metadata returned to the client will be stale. The client will > waste a roundtrip sending requests to the stale partition leader, get a > NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the > topic metadata again. > [2] Clients can issue ProduceRequests to the wrong broker based on stale > metadata, causing rejected ProduceRequests. Based on how long the client acts > based on the stale metadata, the impact may or may not be visible to a > producer application. If the number of rejected ProduceRequests does not > exceed the max number of retries, the producer application would not be > impacted. On the other hand, if the retries are exhausted, the failed produce > will be visible to the producer application. > [3] The official definition of data loss in kafka is when we lose a > "committed" message. A message is considered "committed" when all in sync > replicas for that partition have applied it to their log. > [4] Say a number of ProduceRequests are sitting infront of a controller's > LeaderAndIsrRequest on a given broker's request queue. Suppose the > ProduceRequests are for partitions whose leadership has recently shifted out > from the current broker to another broker in the replica set. Today the > broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning > the ProduceRequests are getting processed on the former partition leader. As > part of becoming a follower for a partition, the broker truncates the log to > the high-watermark. With weaker ack settings such as acks=1, the leader may > successfully write to its own log, respond to the user with a success, > process the LeaderAndIsrRequest making the broker a follower of the > partition, and truncate the log to a point before the user's produced > messages. So users have a false sense that their produce attempt succeeded > while in reality their messages got erased. While technically part of what > they signed up for with acks=1, it can still come as a surprise. > KIP-291: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%3A+Separating+controller+connections+and+requests+from+the+data+plane] -- This message was sent by Atlassian JIRA (v7.6.3#76005)