[jira] [Updated] (KAFKA-6777) Wrong reaction on Out Of Memory situation
[ https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seweryn Habdank-Wojewodzki updated KAFKA-6777: -- Description: Dears, We already encountered many times problems related to Out Of Memory situation in Kafka Broker and streaming clients. The scenario is the following. When Kafka Broker (or Streaming Client) is under load and has too less memory, there are no errors in server logs. One can see some cryptic entries in GC logs, but they are definitely not self-explaining. Kafka Broker (and Streaming Clients) works further. Later we see in JMX monitoring, that JVM uses more and more time in GC. In our case it grows from e.g. 1% to 80%-90% of CPU time is used by GC. Next software collapses into zombie mode – process in not ending. In such a case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse Kafka treats such a zombie process normal and somewhat sends messages, which are in fact getting lost, also the cluster is not excluding broken nodes. The question is how to configure Kafka to really terminate the JVM and not remain in zombie mode, to give a chance to other nodes to know, that something is dead. I would expect that in Out Of Memory situation JVM is ended if not graceful than at least process is crashed. was: Dears, We already encountered many times problems related to Out Of Memory situation in Kafka Broker and streaming clients. The scenario is the following. When Kafka Broker (or Streaming Client) is under load and has too less memory, there are no errors in server logs. One can see some cryptic entries in GC logs, but they are definitely not self-explaining. Kafka Broker (and Streaming Clients) works further. Later we see in JMX monitoring, that JVM uses more and more time in GC. In our case it grows from e. 1% to 80%-90% of CPU time is used by GC. Next software collapses into zombie mode – process in not ending. In such a case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse Kafka treats such a zombie process normal and somewhat sends messages, which are in fact getting lost, also the cluster is not excluding broken nodes. The question is how to configure Kafka to really terminate the JVM and not remain in zombie mode, to give a chance to other nodes to know, that something is dead. I would expect that in Out Of Memory situation JVM is ended if not graceful than at least process is crashed. > Wrong reaction on Out Of Memory situation > - > > Key: KAFKA-6777 > URL: https://issues.apache.org/jira/browse/KAFKA-6777 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > > Dears, > We already encountered many times problems related to Out Of Memory situation > in Kafka Broker and streaming clients. > The scenario is the following. > When Kafka Broker (or Streaming Client) is under load and has too less > memory, there are no errors in server logs. One can see some cryptic entries > in GC logs, but they are definitely not self-explaining. > Kafka Broker (and Streaming Clients) works further. Later we see in JMX > monitoring, that JVM uses more and more time in GC. In our case it grows from > e.g. 1% to 80%-90% of CPU time is used by GC. > Next software collapses into zombie mode – process in not ending. In such a > case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse > Kafka treats such a zombie process normal and somewhat sends messages, which > are in fact getting lost, also the cluster is not excluding broken nodes. The > question is how to configure Kafka to really terminate the JVM and not remain > in zombie mode, to give a chance to other nodes to know, that something is > dead. > I would expect that in Out Of Memory situation JVM is ended if not graceful > than at least process is crashed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6777) Wrong reaction on Out Of Memory situation
[ https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seweryn Habdank-Wojewodzki updated KAFKA-6777: -- Description: Dears, We already encountered many times problems related to Out Of Memory situation in Kafka Broker and streaming clients. The scenario is the following. When Kafka Broker (or Streaming Client) is under load and has too less memory, there are no errors in server logs. One can see some cryptic entries in GC logs, but they are definitely not self-explaining. Kafka Broker (and Streaming Clients) works further. Later we see in JMX monitoring, that JVM uses more and more time in GC. In our case it grows from e.g. 1% to 80%-90% of CPU time is used by GC. Next, software collapses into zombie mode – process in not ending. In such a case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse Kafka treats such a zombie process normal and somewhat sends messages, which are in fact getting lost, also the cluster is not excluding broken nodes. The question is how to configure Kafka to really terminate the JVM and not remain in zombie mode, to give a chance to other nodes to know, that something is dead. I would expect that in Out Of Memory situation JVM is ended if not graceful than at least process is crashed. was: Dears, We already encountered many times problems related to Out Of Memory situation in Kafka Broker and streaming clients. The scenario is the following. When Kafka Broker (or Streaming Client) is under load and has too less memory, there are no errors in server logs. One can see some cryptic entries in GC logs, but they are definitely not self-explaining. Kafka Broker (and Streaming Clients) works further. Later we see in JMX monitoring, that JVM uses more and more time in GC. In our case it grows from e.g. 1% to 80%-90% of CPU time is used by GC. Next software collapses into zombie mode – process in not ending. In such a case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse Kafka treats such a zombie process normal and somewhat sends messages, which are in fact getting lost, also the cluster is not excluding broken nodes. The question is how to configure Kafka to really terminate the JVM and not remain in zombie mode, to give a chance to other nodes to know, that something is dead. I would expect that in Out Of Memory situation JVM is ended if not graceful than at least process is crashed. > Wrong reaction on Out Of Memory situation > - > > Key: KAFKA-6777 > URL: https://issues.apache.org/jira/browse/KAFKA-6777 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > > Dears, > We already encountered many times problems related to Out Of Memory situation > in Kafka Broker and streaming clients. > The scenario is the following. > When Kafka Broker (or Streaming Client) is under load and has too less > memory, there are no errors in server logs. One can see some cryptic entries > in GC logs, but they are definitely not self-explaining. > Kafka Broker (and Streaming Clients) works further. Later we see in JMX > monitoring, that JVM uses more and more time in GC. In our case it grows from > e.g. 1% to 80%-90% of CPU time is used by GC. > Next, software collapses into zombie mode – process in not ending. In such a > case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse > Kafka treats such a zombie process normal and somewhat sends messages, which > are in fact getting lost, also the cluster is not excluding broken nodes. The > question is how to configure Kafka to really terminate the JVM and not remain > in zombie mode, to give a chance to other nodes to know, that something is > dead. > I would expect that in Out Of Memory situation JVM is ended if not graceful > than at least process is crashed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430366#comment-16430366 ] Veera edited comment on KAFKA-6052 at 4/11/18 7:28 AM: --- Hi Jason Could you please confirm the release date of either 1.1.1. or 1.2.0 ? will wait if it happens in this month or else we take the fix and apply to 1.0.0 version. Thanks Veeru was (Author: vmallavarapu): Hi Jason Could you please confirm the release date of either 1.1.1. or 1.2.0 ? Many Thanks. > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0, 1.0.1 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran >Assignee: Vahid Hashemian >Priority: Major > Labels: transactions, windows > Fix For: 1.2.0, 1.1.1 > > Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, > logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producer producer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done."); > Logger.log(this, "Committing..."); > producer.commitTransaction(); > Logger.log(this, "Committing done."); > } catch (ProducerFencedException | OutOfOrderSequenceException > | AuthorizationException e) { > producer.close(); > e.printStackTrace(); > } catch (KafkaException e) { > producer.abortTransaction(); > e.printStackTrace(); > } > producer.close(); > } > *In Consumer* > I have set isolation.level=read_committed > *In 3 Brokers* > I'm running with the following properties > Properties props = new Properties(); > props.setProperty("broker.id", "" + i); > props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); > props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + > i); > props.setProperty("num.partitions", "1"); > props.setProperty("zookeeper.connect", "localhost:2181"); > props.setProperty("zookeeper.connection.timeout.ms", "6000"); > props.setProperty("min.insync.replicas", "2"); > props.setProperty("offsets.topic.replication.factor", "2"); > props.setProperty("offsets.topic.num.partitions", "1"); > props.setProperty("transaction.state.log.num.partitions", "2"); > props.setProperty("transaction.state.log.replication.factor", "2"); > props.setProperty("transaction.state.log.min.isr", "2"); > I am not getting any records in the consumer. When I set > isolation.level=read_uncommitted, I get the records. I assume that the > records are not getting commited. What could be the problem? log attached -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433723#comment-16433723 ] Jagadesh Adireddi commented on KAFKA-6677: -- Hi [~mjsax], I am thinking to contribute to this issue. As per ticket description, by removing ` tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);` . from Class: StreamsConfig will resolve the issue, or i need to dig more for fixing this. Kindly provide your inputs. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6777) Wrong reaction on Out Of Memory situation
[ https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433726#comment-16433726 ] Mickael Maison commented on KAFKA-6777: --- You usually do that via JVM options. For example on Oracle's JVM, you can use: -XX:OnOutOfMemoryError="; " [http://www.oracle.com/technetwork/java/javase/tech/vmoptions-jsp-140102.html] If you set it in KAFKA_OPTS, it will be automatically picked up by the tools under /bin > Wrong reaction on Out Of Memory situation > - > > Key: KAFKA-6777 > URL: https://issues.apache.org/jira/browse/KAFKA-6777 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > > Dears, > We already encountered many times problems related to Out Of Memory situation > in Kafka Broker and streaming clients. > The scenario is the following. > When Kafka Broker (or Streaming Client) is under load and has too less > memory, there are no errors in server logs. One can see some cryptic entries > in GC logs, but they are definitely not self-explaining. > Kafka Broker (and Streaming Clients) works further. Later we see in JMX > monitoring, that JVM uses more and more time in GC. In our case it grows from > e.g. 1% to 80%-90% of CPU time is used by GC. > Next, software collapses into zombie mode – process in not ending. In such a > case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse > Kafka treats such a zombie process normal and somewhat sends messages, which > are in fact getting lost, also the cluster is not excluding broken nodes. The > question is how to configure Kafka to really terminate the JVM and not remain > in zombie mode, to give a chance to other nodes to know, that something is > dead. > I would expect that in Out Of Memory situation JVM is ended if not graceful > than at least process is crashed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6778) DescribeConfigs does not return error for non-existent topic
Magnus Edenhill created KAFKA-6778: -- Summary: DescribeConfigs does not return error for non-existent topic Key: KAFKA-6778 URL: https://issues.apache.org/jira/browse/KAFKA-6778 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 1.1.0 Reporter: Magnus Edenhill Sending a DescribeConfigsRequest with a ConfigResource(TOPIC, "non-existent-topic") returns a fully populated ConfigResource back in the response with 24 configuration entries. A resource-level error_code of UnknownTopic.. would be expected instead. {code:java} [0081_admin / 1.143s] ConfigResource #0: type TOPIC (2), "rdkafkatest_rnd3df408bf5d94d696_DescribeConfigs_notexist": 24 ConfigEntries, error NO_ERROR () [0081_admin / 1.144s] #0/24: Source UNKNOWN (5): "compression.type"="producer" [is read-only=n, default=n, sensitive=n, synonym=n] with 1 synonym(s) {code} But the topic does not exist: {code:java} $ $KAFKA_PATH/bin/kafka-topics.sh --zookeeper $ZK_ADDRESS --list | grep rdkafkatest_rnd3df408bf5d94d696_DescribeConfigs_notexist ; echo $? 1 {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6777) Wrong reaction on Out Of Memory situation
[ https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seweryn Habdank-Wojewodzki updated KAFKA-6777: -- Attachment: screenshot-1.png > Wrong reaction on Out Of Memory situation > - > > Key: KAFKA-6777 > URL: https://issues.apache.org/jira/browse/KAFKA-6777 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > Attachments: screenshot-1.png > > > Dears, > We already encountered many times problems related to Out Of Memory situation > in Kafka Broker and streaming clients. > The scenario is the following. > When Kafka Broker (or Streaming Client) is under load and has too less > memory, there are no errors in server logs. One can see some cryptic entries > in GC logs, but they are definitely not self-explaining. > Kafka Broker (and Streaming Clients) works further. Later we see in JMX > monitoring, that JVM uses more and more time in GC. In our case it grows from > e.g. 1% to 80%-90% of CPU time is used by GC. > Next, software collapses into zombie mode – process in not ending. In such a > case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse > Kafka treats such a zombie process normal and somewhat sends messages, which > are in fact getting lost, also the cluster is not excluding broken nodes. The > question is how to configure Kafka to really terminate the JVM and not remain > in zombie mode, to give a chance to other nodes to know, that something is > dead. > I would expect that in Out Of Memory situation JVM is ended if not graceful > than at least process is crashed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6777) Wrong reaction on Out Of Memory situation
[ https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433799#comment-16433799 ] Seweryn Habdank-Wojewodzki commented on KAFKA-6777: --- Thanks for comment. The problem is, that either the OnOutOfMemoryError is never thrown, as the algorithms trying to do their best and they are loading GC, and then no message processing may happen. Or the OnOutOfMemoryError is thrown, but caught in code like catch(Throwable) {} The observed bahaviour is that at INFO level logs there is no explicit error like: OnOutOfMemoryError. I had seen in JMX metrics and there heap is out and GC is endless busy, till nothing is also to JMX reported. I mean I can write a tool to reboot Kafka node, when GC load on CPU is higher than 40% or so, but this kind of tool is workaround and not a solution for the problem. I am attaching graphs to highlight wat had happend. !screenshot-1.png! > Wrong reaction on Out Of Memory situation > - > > Key: KAFKA-6777 > URL: https://issues.apache.org/jira/browse/KAFKA-6777 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > Attachments: screenshot-1.png > > > Dears, > We already encountered many times problems related to Out Of Memory situation > in Kafka Broker and streaming clients. > The scenario is the following. > When Kafka Broker (or Streaming Client) is under load and has too less > memory, there are no errors in server logs. One can see some cryptic entries > in GC logs, but they are definitely not self-explaining. > Kafka Broker (and Streaming Clients) works further. Later we see in JMX > monitoring, that JVM uses more and more time in GC. In our case it grows from > e.g. 1% to 80%-90% of CPU time is used by GC. > Next, software collapses into zombie mode – process in not ending. In such a > case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse > Kafka treats such a zombie process normal and somewhat sends messages, which > are in fact getting lost, also the cluster is not excluding broken nodes. The > question is how to configure Kafka to really terminate the JVM and not remain > in zombie mode, to give a chance to other nodes to know, that something is > dead. > I would expect that in Out Of Memory situation JVM is ended if not graceful > than at least process is crashed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6777) Wrong reaction on Out Of Memory situation
[ https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433799#comment-16433799 ] Seweryn Habdank-Wojewodzki edited comment on KAFKA-6777 at 4/11/18 12:14 PM: - Thanks for comment. The problem is, that either the OnOutOfMemoryError is never thrown, as the algorithms trying to do their best and they are loading GC, and then no message processing may happen. Or the OnOutOfMemoryError is thrown, but caught in code like catch(Throwable) {} The observed bahaviour is that at INFO level logs there is no explicit error like: OnOutOfMemoryError. I had seen in JMX metrics and there heap is out and GC is endless busy, till nothing is also to JMX reported. I mean I can write a tool to reboot Kafka node, when GC load on CPU is higher than 40% or so, but this kind of tool is workaround and not a solution for the problem. I am attaching graphs to highlight wat had happend. On the image blow there are metrics from 2 Kafka nodes. The green one was dead/zombie when GC time reached 80%. This "drop" of value is only a presentation matter. !screenshot-1.png! was (Author: habdank): Thanks for comment. The problem is, that either the OnOutOfMemoryError is never thrown, as the algorithms trying to do their best and they are loading GC, and then no message processing may happen. Or the OnOutOfMemoryError is thrown, but caught in code like catch(Throwable) {} The observed bahaviour is that at INFO level logs there is no explicit error like: OnOutOfMemoryError. I had seen in JMX metrics and there heap is out and GC is endless busy, till nothing is also to JMX reported. I mean I can write a tool to reboot Kafka node, when GC load on CPU is higher than 40% or so, but this kind of tool is workaround and not a solution for the problem. I am attaching graphs to highlight wat had happend. !screenshot-1.png! > Wrong reaction on Out Of Memory situation > - > > Key: KAFKA-6777 > URL: https://issues.apache.org/jira/browse/KAFKA-6777 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > Attachments: screenshot-1.png > > > Dears, > We already encountered many times problems related to Out Of Memory situation > in Kafka Broker and streaming clients. > The scenario is the following. > When Kafka Broker (or Streaming Client) is under load and has too less > memory, there are no errors in server logs. One can see some cryptic entries > in GC logs, but they are definitely not self-explaining. > Kafka Broker (and Streaming Clients) works further. Later we see in JMX > monitoring, that JVM uses more and more time in GC. In our case it grows from > e.g. 1% to 80%-90% of CPU time is used by GC. > Next, software collapses into zombie mode – process in not ending. In such a > case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse > Kafka treats such a zombie process normal and somewhat sends messages, which > are in fact getting lost, also the cluster is not excluding broken nodes. The > question is how to configure Kafka to really terminate the JVM and not remain > in zombie mode, to give a chance to other nodes to know, that something is > dead. > I would expect that in Out Of Memory situation JVM is ended if not graceful > than at least process is crashed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6777) Wrong reaction on Out Of Memory situation
[ https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433799#comment-16433799 ] Seweryn Habdank-Wojewodzki edited comment on KAFKA-6777 at 4/11/18 12:17 PM: - Thanks for comment. The problem is, that either the OnOutOfMemoryError is never thrown, as the algorithms are trying to do their best and they are loading GC, so later no message processing may happen, as most CPU is used by GC. Or the OnOutOfMemoryError is thrown, but caught in code like catch(Throwable) {} The observed bahaviour is that at INFO level logs there is no explicit error like: OnOutOfMemoryError. I had seen in JMX metrics and there heap is out and GC is endless busy, till nothing is also to JMX reported. I mean I can write a tool to reboot Kafka node, when GC load on CPU is higher than 40% or so, but this kind of tool is workaround and not a solution for the problem. I am attaching graphs to highlight wat had happened. On the image blow there are metrics from 2 Kafka nodes. The green one was dead/zombie when GC time reached 80%. This "drop" of value is only a presentation matter. !screenshot-1.png! was (Author: habdank): Thanks for comment. The problem is, that either the OnOutOfMemoryError is never thrown, as the algorithms trying to do their best and they are loading GC, and then no message processing may happen. Or the OnOutOfMemoryError is thrown, but caught in code like catch(Throwable) {} The observed bahaviour is that at INFO level logs there is no explicit error like: OnOutOfMemoryError. I had seen in JMX metrics and there heap is out and GC is endless busy, till nothing is also to JMX reported. I mean I can write a tool to reboot Kafka node, when GC load on CPU is higher than 40% or so, but this kind of tool is workaround and not a solution for the problem. I am attaching graphs to highlight wat had happend. On the image blow there are metrics from 2 Kafka nodes. The green one was dead/zombie when GC time reached 80%. This "drop" of value is only a presentation matter. !screenshot-1.png! > Wrong reaction on Out Of Memory situation > - > > Key: KAFKA-6777 > URL: https://issues.apache.org/jira/browse/KAFKA-6777 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > Attachments: screenshot-1.png > > > Dears, > We already encountered many times problems related to Out Of Memory situation > in Kafka Broker and streaming clients. > The scenario is the following. > When Kafka Broker (or Streaming Client) is under load and has too less > memory, there are no errors in server logs. One can see some cryptic entries > in GC logs, but they are definitely not self-explaining. > Kafka Broker (and Streaming Clients) works further. Later we see in JMX > monitoring, that JVM uses more and more time in GC. In our case it grows from > e.g. 1% to 80%-90% of CPU time is used by GC. > Next, software collapses into zombie mode – process in not ending. In such a > case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse > Kafka treats such a zombie process normal and somewhat sends messages, which > are in fact getting lost, also the cluster is not excluding broken nodes. The > question is how to configure Kafka to really terminate the JVM and not remain > in zombie mode, to give a chance to other nodes to know, that something is > dead. > I would expect that in Out Of Memory situation JVM is ended if not graceful > than at least process is crashed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6777) Wrong reaction on Out Of Memory situation
[ https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433863#comment-16433863 ] Seweryn Habdank-Wojewodzki commented on KAFKA-6777: --- One more comment. I see quite often in Kafka that Throwable is converted to RuntimeException. This kind of code may lead to situation when OOM will never appear. I had made simple example: {code:java} public class Main { public static void main ( String[] args ) { try { try { throw new OutOfMemoryError(); // very often in Kafka code: } catch ( Throwable t ) { throw ( RuntimeException ) t; } // end of very often } catch ( Exception ignore ) { } } } {code} Executed with: {code:java} -XX:OnOutOfMemoryError="echo OOM" {code} leads to: {code:java} Process finished with exit code 0 {code} I see no *OOM* string, also no _OutOfMemoryError_ is noticed, by any stactrace. > Wrong reaction on Out Of Memory situation > - > > Key: KAFKA-6777 > URL: https://issues.apache.org/jira/browse/KAFKA-6777 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > Attachments: screenshot-1.png > > > Dears, > We already encountered many times problems related to Out Of Memory situation > in Kafka Broker and streaming clients. > The scenario is the following. > When Kafka Broker (or Streaming Client) is under load and has too less > memory, there are no errors in server logs. One can see some cryptic entries > in GC logs, but they are definitely not self-explaining. > Kafka Broker (and Streaming Clients) works further. Later we see in JMX > monitoring, that JVM uses more and more time in GC. In our case it grows from > e.g. 1% to 80%-90% of CPU time is used by GC. > Next, software collapses into zombie mode – process in not ending. In such a > case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse > Kafka treats such a zombie process normal and somewhat sends messages, which > are in fact getting lost, also the cluster is not excluding broken nodes. The > question is how to configure Kafka to really terminate the JVM and not remain > in zombie mode, to give a chance to other nodes to know, that something is > dead. > I would expect that in Out Of Memory situation JVM is ended if not graceful > than at least process is crashed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6777) Wrong reaction on Out Of Memory situation
[ https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433863#comment-16433863 ] Seweryn Habdank-Wojewodzki edited comment on KAFKA-6777 at 4/11/18 12:58 PM: - One more comment. I see quite often in Kafka that Throwable is converted to RuntimeException. This kind of code may lead to situation when OOM will never appear. I had made simple example: {code:java} public class Main { public static void main ( String[] args ) { try { try { throw new OutOfMemoryError(); // very often in Kafka code: } catch ( Throwable t ) { throw ( RuntimeException ) t; } // end of very often } catch ( Exception ignore ) { } } } {code} Executed with: {code:java} -XX:OnOutOfMemoryError="echo OOM" {code} leads to: {code:java} Process finished with exit code 0 {code} I see no *OOM* string, also no _OutOfMemoryError_ is noticed, by any stactrace. Generally all Errors derived from {{java.lang.Error}} are swallowed, including: * InternalError, * OutOfMemoryError, * StackOverflowError, * UnknownError, * ThreadDeath, * IOError was (Author: habdank): One more comment. I see quite often in Kafka that Throwable is converted to RuntimeException. This kind of code may lead to situation when OOM will never appear. I had made simple example: {code:java} public class Main { public static void main ( String[] args ) { try { try { throw new OutOfMemoryError(); // very often in Kafka code: } catch ( Throwable t ) { throw ( RuntimeException ) t; } // end of very often } catch ( Exception ignore ) { } } } {code} Executed with: {code:java} -XX:OnOutOfMemoryError="echo OOM" {code} leads to: {code:java} Process finished with exit code 0 {code} I see no *OOM* string, also no _OutOfMemoryError_ is noticed, by any stactrace. > Wrong reaction on Out Of Memory situation > - > > Key: KAFKA-6777 > URL: https://issues.apache.org/jira/browse/KAFKA-6777 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Critical > Attachments: screenshot-1.png > > > Dears, > We already encountered many times problems related to Out Of Memory situation > in Kafka Broker and streaming clients. > The scenario is the following. > When Kafka Broker (or Streaming Client) is under load and has too less > memory, there are no errors in server logs. One can see some cryptic entries > in GC logs, but they are definitely not self-explaining. > Kafka Broker (and Streaming Clients) works further. Later we see in JMX > monitoring, that JVM uses more and more time in GC. In our case it grows from > e.g. 1% to 80%-90% of CPU time is used by GC. > Next, software collapses into zombie mode – process in not ending. In such a > case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse > Kafka treats such a zombie process normal and somewhat sends messages, which > are in fact getting lost, also the cluster is not excluding broken nodes. The > question is how to configure Kafka to really terminate the JVM and not remain > in zombie mode, to give a chance to other nodes to know, that something is > dead. > I would expect that in Out Of Memory situation JVM is ended if not graceful > than at least process is crashed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6760) responses not logged properly in controller
[ https://issues.apache.org/jira/browse/KAFKA-6760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-6760: - Assignee: Mickael Maison > responses not logged properly in controller > --- > > Key: KAFKA-6760 > URL: https://issues.apache.org/jira/browse/KAFKA-6760 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 1.1.0 >Reporter: Jun Rao >Assignee: Mickael Maison >Priority: Major > Labels: newbie > > Saw the following logging in controller.log. We need to log the > StopReplicaResponse properly in KafkaController. > [2018-04-05 14:38:41,878] DEBUG [Controller id=0] Delete topic callback > invoked for org.apache.kafka.common.requests.StopReplicaResponse@263d40c > (kafka.controller.K > afkaController) > It seems that the same issue exists for LeaderAndIsrResponse as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6752) Unclean leader election metric no longer working
[ https://issues.apache.org/jira/browse/KAFKA-6752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434271#comment-16434271 ] ASF GitHub Bot commented on KAFKA-6752: --- junrao closed pull request #4838: KAFKA-6752: Enable unclean leader election metric URL: https://github.com/apache/kafka/pull/4838 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 74bc59faee2..6805e321393 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -362,7 +362,7 @@ class PartitionStateMachine(config: KafkaConfig, if (leaderIsrAndControllerEpochOpt.nonEmpty) { val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr -val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled) +val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext) val newLeaderAndIsrOpt = leaderOpt.map { leader => val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) else List(leader) @@ -435,10 +435,13 @@ class PartitionStateMachine(config: KafkaConfig, } object PartitionLeaderElectionAlgorithms { - def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = { + def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = { assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse { if (uncleanLeaderElectionEnabled) { -assignment.find(liveReplicas.contains) +val leaderOpt = assignment.find(liveReplicas.contains) +if (!leaderOpt.isEmpty) + controllerContext.stats.uncleanLeaderElectionRate.mark() +leaderOpt } else { None } diff --git a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala index f149fc93a49..113a39d5430 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala @@ -17,10 +17,17 @@ package kafka.controller import org.junit.Assert._ -import org.junit.Test +import org.junit.{Before, Test} import org.scalatest.junit.JUnitSuite class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { + private var controllerContext: ControllerContext = null + + @Before + def setUp(): Unit = { +controllerContext = new ControllerContext +controllerContext.stats.removeMetric("UncleanLeaderElectionsPerSec") + } @Test def testOfflinePartitionLeaderElection(): Unit = { @@ -30,7 +37,8 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = false) + uncleanLeaderElectionEnabled = false, + controllerContext) assertEquals(Option(4), leaderOpt) } @@ -42,9 +50,12 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = false) + uncleanLeaderElectionEnabled = false, + controllerContext) assertEquals(None, leaderOpt) +assertEquals(0, controllerContext.stats.uncleanLeaderElectionRate.count()) } + @Test def testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): Unit = { val assignment = Seq(2, 4) @@ -53,8 +64,10 @@ class PartitionLeaderElectionAlgorithmsTest extends JUnitSuite { val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas, - uncleanLeaderElectionEnabled = true) + uncleanLeaderElectionEnabled = true, + controllerContext) assertEquals(Option(4), leaderOpt) +assertEquals(1, controllerC
[jira] [Resolved] (KAFKA-6752) Unclean leader election metric no longer working
[ https://issues.apache.org/jira/browse/KAFKA-6752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-6752. Resolution: Fixed Fix Version/s: 1.1.1 2.0.0 Merged to trunk and 1.1. > Unclean leader election metric no longer working > > > Key: KAFKA-6752 > URL: https://issues.apache.org/jira/browse/KAFKA-6752 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 1.1.0 >Reporter: Jason Gustafson >Assignee: Manikumar >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > Happened to notice that the unclean leader election meter is no longer being > updated. This was probably lost during the controller overhaul. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6447) Add Delegation Token Operations to KafkaAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434307#comment-16434307 ] ASF GitHub Bot commented on KAFKA-6447: --- junrao closed pull request #4427: KAFKA-6447: Add Delegation Token Operations to KafkaAdminClient (KIP-249) URL: https://github.com/apache/kafka/pull/4427 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index f83698050ed..69f560ec38f 100644 --- a/build.gradle +++ b/build.gradle @@ -858,6 +858,7 @@ project(':clients') { include "**/org/apache/kafka/common/config/*" include "**/org/apache/kafka/common/security/auth/*" include "**/org/apache/kafka/server/policy/*" +include "**/org/apache/kafka/common/security/token/delegation/*" } } diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 0fec810a95b..2767132886d 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -10,7 +10,7 @@ + files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient|AdminClient).java"/> re */ public abstract DeleteRecordsResult deleteRecords(Map recordsToDelete, DeleteRecordsOptions options); + +/** + * Create a Delegation Token. + * + * This is a convenience method for {@link #createDelegationToken(CreateDelegationTokenOptions)} with default options. + * See the overload for more details. + * + * @return The CreateDelegationTokenResult. + */ +public CreateDelegationTokenResult createDelegationToken() { +return createDelegationToken(new CreateDelegationTokenOptions()); +} + + +/** + * Create a Delegation Token. + * + * This operation is supported by brokers with version 1.1.0 or higher. + * + * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the + * {@link CreateDelegationTokenResult#delegationToken() delegationToken()} method of the returned {@code CreateDelegationTokenResult} + * + * {@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException} + * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels. + * {@link org.apache.kafka.common.errors.InvalidPrincipalTypeException} + * if the renewers principal type is not supported. + * {@link org.apache.kafka.common.errors.DelegationTokenDisabledException} + * if the delegation token feature is disabled. + * {@link org.apache.kafka.common.errors.TimeoutException} + * if the request was not completed in within the given {@link CreateDelegationTokenOptions#timeoutMs()}. + * + * + * @param options The options to use when creating delegation token. + * @return The DeleteRecordsResult. + */ +public abstract CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options); + + +/** + * Renew a Delegation Token. + * + * This is a convenience method for {@link #renewDelegationToken(byte[], RenewDelegationTokenOptions)} with default options. + * See the overload for more details. + * + * + * @param hmac HMAC of the Delegation token + * @return The RenewDelegationTokenResult. + */ +public RenewDelegationTokenResult renewDelegationToken(byte[] hmac) { +return renewDelegationToken(hmac, new RenewDelegationTokenOptions()); +} + +/** + * Renew a Delegation Token. + * + * This operation is supported by brokers with version 1.1.0 or higher. + * + * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the + * {@link RenewDelegationTokenResult#expiryTimestamp() expiryTimestamp()} method of the returned {@code RenewDelegationTokenResult} + * + * {@link org.apache.kafka.common.errors.UnsupportedByAuthenticationException} + * If the request sent on PLAINTEXT/1-way SSL channels or delegation token authenticated channels. + * {@link org.apache.kafka.common.errors.DelegationTokenDisabledException} + * if the delegation token feature is disabled. + * {@link org.apache.kafka.common.errors.DelegationTokenNotFoundException} + * if the delegation token is not found on server. + * {@link org.apache.kafka.common.errors.DelegationTokenOwnerMismatchExcep
[jira] [Resolved] (KAFKA-6447) Add Delegation Token Operations to KafkaAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-6447. Resolution: Fixed Fix Version/s: 2.0.0 Merged the PR to trunk. > Add Delegation Token Operations to KafkaAdminClient > --- > > Key: KAFKA-6447 > URL: https://issues.apache.org/jira/browse/KAFKA-6447 > Project: Kafka > Issue Type: Sub-task >Reporter: Manikumar >Assignee: Manikumar >Priority: Major > Fix For: 2.0.0 > > > This JIRA is about adding delegation token operations to the new Admin Client > API. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-249%3A+Add+Delegation+Token+Operations+to+KafkaAdminClient -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6611) Re-write simple benchmark in system tests with JMXTool
[ https://issues.apache.org/jira/browse/KAFKA-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434325#comment-16434325 ] ASF GitHub Bot commented on KAFKA-6611: --- guozhangwang opened a new pull request #4854: KAFKA-6611, PART II: Improve Streams SimpleBenchmark URL: https://github.com/apache/kafka/pull/4854 1. SimpleBenchmark: 1.a Do not rely on manual num.records / bytes collection on atomic integers. 1.b Rely on config files for num.threads, bootstrap.servers, etc. 1.c Add parameters for key skewness and value size. 1.d Refactor the tests for loading phase, adding tumbling-windowed count. 1.e For consumer / consumeproduce, collect metrics on consumer instead. 1.f Force stop the test after 3 minutes, this is based on empirical numbers of 10M records. 2. Other tests: use config for kafka bootstrap servers. 3. streams_simple_benchmark.py: only use scale 1 for system test, remove yahoo from benchmark tests. Note that the JMX based metrics is more accurate than the manually collected metrics. For example: ``` Module: kafkatest.benchmarks.streams.streams_simple_benchmark_test Class: StreamsSimpleBenchmarkTest Method: test_simple_benchmark Arguments: { "scale": 3, "test": "streamcount" } { "Streams Count Performance [records/latency/rec-sec/MB-sec counted]0": " 3691724/180042/20504.79332600171/20.64996886468678\n", "Streams Count Performance [records/latency/rec-sec/MB-sec counted]1": " 3337273/180037/18536.595255419717/18.667835797999594\n", "Streams Count Performance [records/latency/rec-sec/MB-sec counted]2": " 2971003/180029/16502.913419504635/16.61975533580484\n", "jmx-avg0": { "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-latency-avg": 473.9851851851854, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-rate": 0.03802973633453574, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-latency-avg": 1.6337178935425791, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-rate": 41.49027662118575, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-latency-avg": 0.0021335594038200878, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-rate": 21042.80038005481 }, "jmx-avg1": { "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-latency-avg": 354.08056, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-rate": 0.030462783608341253, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-latency-avg": 2.2233941132541286, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-rate": 35.82136735561879, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-latency-avg": 0.002402570337120884, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-rate": 19100.53692641491 }, "jmx-avg2": { "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-latency-avg": 365.85, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-rate": 0.03052173041795845, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-latency-avg": 1.1700956516025365, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-rate": 28.06739395694291, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-latency-avg": 0.0016847557478484937, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-rate": 15594.917035789254 }, "jmx-max0": { "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-latency-avg": 547.0, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-rate": 0.06657567990413102, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-latency-avg": 13.04, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:poll-rate": 196.4955857339561, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-latency-avg": 0.04672288495817908, "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:process-rate": 99520.53241178217 }, "jmx-max1": { "kafka.streams:type=stream-metrics,client-id=simple-benchmark-StreamThread-1:commit-latency-avg": 65
[jira] [Commented] (KAFKA-6611) Re-write simple benchmark in system tests with JMXTool
[ https://issues.apache.org/jira/browse/KAFKA-6611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434326#comment-16434326 ] ASF GitHub Bot commented on KAFKA-6611: --- guozhangwang closed pull request #4744: KAFKA-6611, PART II: Improve Streams SimpleBenchmark URL: https://github.com/apache/kafka/pull/4744 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 3a9cbc4d664..fc4745d8c6e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -832,6 +832,7 @@ public long maybeUpdate(long now) { long waitForMetadataFetch = this.metadataFetchInProgress ? requestTimeoutMs : 0; long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch); + if (metadataTimeout > 0) { return metadataTimeout; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 0514c995635..6a108269f9a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -252,6 +252,9 @@ private long sendProducerData(long now) { // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); + +log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); + this.metadata.requestUpdate(); } @@ -557,9 +560,13 @@ else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) failBatch(batch, response, exception, batch.attempts() < this.retries); } if (error.exception() instanceof InvalidMetadataException) { -if (error.exception() instanceof UnknownTopicOrPartitionException) +if (error.exception() instanceof UnknownTopicOrPartitionException) { log.warn("Received unknown topic or partition error in produce request on partition {}. The " + "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition); +} else { +log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " + +"to request metadata update now", batch.topicPartition, error.exception().toString()); +} metadata.requestUpdate(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java index c66d78b7310..423184c5bdf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java @@ -40,6 +40,7 @@ import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; @@ -48,9 +49,7 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.test.TestUtils; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -59,7 +58,6 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** * Class that provides support for a series of benchmarks. It is usually driven by @@ -77,114 +75,125 @@ * is still running "consume" */ public class SimpleBenchmark { +private static final String LOADING_PRODUCER_CLIENT_ID = "simple-benchmark-loading-producer"; -final String kafka; -final Boolean loadPhase; -final String testName; -final int numThreads; -final Properties props; -static final String ALL_TESTS = "all"; -private static final String SOURCE_TOPIC = "simpleBenchmarkSour
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434331#comment-16434331 ] Matthias J. Sax commented on KAFKA-6677: Thanks for picking this up. I think what you suggest should be sufficient. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434338#comment-16434338 ] Ismael Juma commented on KAFKA-6677: There is a limit of 5 now btw. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6744) MockProducer with transaction enabled doesn't fail on commit if a record was failed
[ https://issues.apache.org/jira/browse/KAFKA-6744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434357#comment-16434357 ] Pascal Gélinas commented on KAFKA-6744: --- Yeah I could, it's been on the back of my mind but I've been busy with trying to understand/implement transactions on our current setup. When I get back to this test, probably next week, I'll be able to provide a quick PR for this. > MockProducer with transaction enabled doesn't fail on commit if a record was > failed > --- > > Key: KAFKA-6744 > URL: https://issues.apache.org/jira/browse/KAFKA-6744 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Pascal Gélinas >Priority: Minor > > The KafkaProducer#send documentation states the following: > When used as part of a transaction, it is not necessary to define a callback > or check the result of the future in order to detect errors from send. If any > of the send calls failed with an irrecoverable error, the final > commitTransaction() call will fail and throw the exception from the last > failed send. > So I was expecting the following to throw an exception: > {{*MockProducer* producer = new MockProducer<>(false,}} > {{ new StringSerializer(), new ByteArraySerializer());}} > {{producer.initTransactions();}} > {{producer.beginTransaction();}} > {{producer.send(new ProducerRecord<>("foo", new byte[]{}));}} > {{producer.errorNext(new RuntimeException());}} > {{producer.commitTransaction(); // Expecting this to throw}} > Unfortunately, the commitTransaction() call returns successfully. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6775) AbstractProcessor created in SimpleBenchmark should call super#init
[ https://issues.apache.org/jira/browse/KAFKA-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6775: --- Component/s: streams > AbstractProcessor created in SimpleBenchmark should call super#init > --- > > Key: KAFKA-6775 > URL: https://issues.apache.org/jira/browse/KAFKA-6775 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Ted Yu >Priority: Minor > Labels: easy-fix, newbie > > Around line 610: > {code} > return new AbstractProcessor() { > @Override > public void init(ProcessorContext context) { > } > {code} > super.init should be called above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6775) AbstractProcessor created in SimpleBenchmark should call super#init
[ https://issues.apache.org/jira/browse/KAFKA-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6775: --- Labels: easy-fix newbie (was: ) > AbstractProcessor created in SimpleBenchmark should call super#init > --- > > Key: KAFKA-6775 > URL: https://issues.apache.org/jira/browse/KAFKA-6775 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Ted Yu >Priority: Minor > Labels: easy-fix, newbie > > Around line 610: > {code} > return new AbstractProcessor() { > @Override > public void init(ProcessorContext context) { > } > {code} > super.init should be called above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1
[ https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434363#comment-16434363 ] Matthias J. Sax commented on KAFKA-6677: Thanks for pointing out! So we might actually set the default to 5, but allow users to configure a smaller value if they wish, but throw an exception if they configure a larger value. > Remove EOS producer config max.in.flight.request.per.connection=1 > - > > Key: KAFKA-6677 > URL: https://issues.apache.org/jira/browse/KAFKA-6677 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > When EOS was introduced in 0.11, it was required to set producer config > max.in.flight.requests.per.connection=1 for idempotent producer. > This limitations as fixed in 1.0 release via KAFKA-5494 > Thus, we should remove this setting in Kafka Streams if EOS get's enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop
[ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434478#comment-16434478 ] John Roesler commented on KAFKA-5697: - [~guozhang] From your problem description, it seems like the solution isn't to make shutdown more aggressive, but to make consumer.poll not block forever. WDYT? > StreamThread.shutdown() need to interrupt the stream threads to break the loop > -- > > Key: KAFKA-5697 > URL: https://issues.apache.org/jira/browse/KAFKA-5697 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: John Roesler >Priority: Major > Labels: newbie > > In {{StreamThread.shutdown()}} we currently do nothing but set the state, > hoping the stream thread may eventually check it and shutdown itself. > However, under certain scenarios the thread may get blocked within a single > loop and hence will never check on this state enum. For example, it's > {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block > until the coordinator can be found. If the coordinator broker is never up and > running then the Stream instance will be blocked forever. > A simple way to produce this issue is to start the work count demo without > starting the ZK / Kafka broker, and then it will get stuck in a single loop > and even `ctrl-C` will not stop it since its set state will never be read by > the thread: > {code:java} > [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6779) Kafka connect JMX metrics returns inconsistent metrics while running multiple connectors
Sairam Polavarapu created KAFKA-6779: Summary: Kafka connect JMX metrics returns inconsistent metrics while running multiple connectors Key: KAFKA-6779 URL: https://issues.apache.org/jira/browse/KAFKA-6779 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.0.1, 1.1.0, 1.0.0 Environment: Linux Reporter: Sairam Polavarapu JMX Kafka Connect metrics is returning different values for different runs. We are running the Kafka connect in distributed mode on a 3 node cluster which allowed us the submit the connectors using rest API calls. One of the use cases, where we are using MySQL Source Connector (Confluent) and Amazon s3 Sink connector (Confluent) as pipeline and need to get the counts of data received by source and delivered to sink. Both the connectors are submitted one after the other and the scenario what we have observed is when a new connector is submitted using the rest call the existing connector metrics are zeroes and restarted. This is a known case when the actual connectors are restarted but in our case the source connector was not restarted. We have observed the similar behavior when using Kafka 1.0.0, 1.0.1 and even after the fixes released in Kafka 1.1.0 still the issue persists. And also on a side note when the Metrics were tested on the local machine with Kafka connect in distributed mode on a single instance the counts are matching between the source and sink. This issue is only on how JMX is fetching the data. The physical data that is being handled through the connectors don't have any issues. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark
Jun Rao created KAFKA-6780: -- Summary: log cleaner shouldn't clean messages beyond high watermark Key: KAFKA-6780 URL: https://issues.apache.org/jira/browse/KAFKA-6780 Project: Kafka Issue Type: Bug Affects Versions: 1.1.0 Reporter: Jun Rao Currently, the firstUncleanableDirtyOffset computed by the log cleaner is bounded by the first offset in the active segment. It's possible for the high watermark to be smaller than that. This may cause a committed record to be removed because of an uncommitted record. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark
[ https://issues.apache.org/jira/browse/KAFKA-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434646#comment-16434646 ] Jun Rao commented on KAFKA-6780: The fix is probably to further bound firstUncleanableDirtyOffset by the high watermark. > log cleaner shouldn't clean messages beyond high watermark > -- > > Key: KAFKA-6780 > URL: https://issues.apache.org/jira/browse/KAFKA-6780 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Jun Rao >Priority: Major > > Currently, the firstUncleanableDirtyOffset computed by the log cleaner is > bounded by the first offset in the active segment. It's possible for the high > watermark to be smaller than that. This may cause a committed record to be > removed because of an uncommitted record. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark
[ https://issues.apache.org/jira/browse/KAFKA-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6780: --- Fix Version/s: 1.1.1 1.2.0 > log cleaner shouldn't clean messages beyond high watermark > -- > > Key: KAFKA-6780 > URL: https://issues.apache.org/jira/browse/KAFKA-6780 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Jun Rao >Priority: Major > Fix For: 1.2.0, 1.1.1 > > > Currently, the firstUncleanableDirtyOffset computed by the log cleaner is > bounded by the first offset in the active segment. It's possible for the high > watermark to be smaller than that. This may cause a committed record to be > removed because of an uncommitted record. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6773) Kafka consumer without group.id crashes when requesting offset on a topic-partition
[ https://issues.apache.org/jira/browse/KAFKA-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434735#comment-16434735 ] ASF GitHub Bot commented on KAFKA-6773: --- hachikuji closed pull request #4851: KAFKA-6773; Allow offset commit/fetch/describe with empty groupId URL: https://github.com/apache/kafka/pull/4851 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 225b7090761..cbbd91396b2 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -27,7 +27,7 @@ import kafka.utils.Logging import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID} import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.Time @@ -109,7 +109,7 @@ class GroupCoordinator(val brokerId: Int, protocolType: String, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback): Unit = { -validateGroup(groupId).foreach { error => +validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error => responseCallback(joinError(memberId, error)) return } @@ -237,7 +237,7 @@ class GroupCoordinator(val brokerId: Int, memberId: String, groupAssignment: Map[String, Array[Byte]], responseCallback: SyncCallback): Unit = { -validateGroup(groupId) match { +validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match { case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS => // The coordinator is loading, which means we've lost the state of the active rebalance and the // group will need to start over at JoinGroup. By returning rebalance in progress, the consumer @@ -313,7 +313,7 @@ class GroupCoordinator(val brokerId: Int, } def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit): Unit = { -validateGroup(groupId).foreach { error => +validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).foreach { error => responseCallback(error) return } @@ -346,7 +346,7 @@ class GroupCoordinator(val brokerId: Int, var groupsEligibleForDeletion: Seq[GroupMetadata] = Seq() groupIds.foreach { groupId => - validateGroup(groupId) match { + validateGroupStatus(groupId, ApiKeys.DELETE_GROUPS) match { case Some(error) => groupErrors += groupId -> error @@ -386,7 +386,7 @@ class GroupCoordinator(val brokerId: Int, memberId: String, generationId: Int, responseCallback: Errors => Unit) { -validateGroup(groupId).foreach { error => +validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error => if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) // the group is still loading, so respond just blindly responseCallback(Errors.NONE) @@ -448,7 +448,7 @@ class GroupCoordinator(val brokerId: Int, producerEpoch: Short, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { -validateGroup(groupId) match { +validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match { case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error)) case None => val group = groupManager.getGroup(groupId).getOrElse { @@ -463,7 +463,7 @@ class GroupCoordinator(val brokerId: Int, generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit) { -validateGroup(groupId) match { +validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match { case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error)) case None => groupManager.getGroup(groupId) match { @@ -524,7 +524,7 @@ class GroupCoordinator(val brokerId: Int, def handleFetchOffsets(groupId: String, partitions: Option[Seq[TopicPartition]] = None): (Errors, Map[Top
[jira] [Commented] (KAFKA-5697) StreamThread.shutdown() need to interrupt the stream threads to break the loop
[ https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434745#comment-16434745 ] ASF GitHub Bot commented on KAFKA-5697: --- vvcephei opened a new pull request #4855: KAFKA-5697: prevent poll() from blocking forever URL: https://github.com/apache/kafka/pull/4855 As the name says, currently `KafkaConsumer#poll` may block forever despite having a timeout parameter, since that timeout only applies to the `ConsumerNetworkClient#poll` and not the `ConsumerCoordinator#poll`. This change applies the timeout to the entire `poll` operation, allowing us to ensure threads won't hang forever in the event, e.g., that we can't talk to the coordinator. I've tried tomake the change in a 'private' fashion so as not to change any public APIs, but let me know if you think it still needs a KIP. Several tests depended on being able to send a timeout of 0, but still have the coordinator poll take non-zero time to do its work. I updated them to send a long enough timeout for the coordinator to to the required work. I believe our testing suite should be sufficient to validate this change. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamThread.shutdown() need to interrupt the stream threads to break the loop > -- > > Key: KAFKA-5697 > URL: https://issues.apache.org/jira/browse/KAFKA-5697 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: John Roesler >Priority: Major > Labels: newbie > > In {{StreamThread.shutdown()}} we currently do nothing but set the state, > hoping the stream thread may eventually check it and shutdown itself. > However, under certain scenarios the thread may get blocked within a single > loop and hence will never check on this state enum. For example, it's > {{consumer.poll}} call trigger {{ensureCoordinatorReady()}} which will block > until the coordinator can be found. If the coordinator broker is never up and > running then the Stream instance will be blocked forever. > A simple way to produce this issue is to start the work count demo without > starting the ZK / Kafka broker, and then it will get stuck in a single loop > and even `ctrl-C` will not stop it since its set state will never be read by > the thread: > {code:java} > [2017-08-03 15:17:39,981] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,046] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,101] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,206] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,261] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,366] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2017-08-03 15:17:40,472] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > ^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be > established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434752#comment-16434752 ] Jeremy Custenborder commented on KAFKA-4107: This should be implemented in the REST API. Maybe a combination of setting the offset and restarting the task? > Support offset reset capability in Kafka Connect > > > Key: KAFKA-4107 > URL: https://issues.apache.org/jira/browse/KAFKA-4107 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jason Gustafson >Priority: Major > > It would be useful in some cases to be able to reset connector offsets. For > example, if a topic in Kafka corresponding to a source database is > accidentally deleted (or deleted because of corrupt data), an administrator > may want to reset offsets and reproduce the log from the beginning. It may > also be useful to have support for overriding offsets, but that seems like a > less likely use case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6678) Upgrade dependencies with later release versions
[ https://issues.apache.org/jira/browse/KAFKA-6678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434768#comment-16434768 ] Ted Yu commented on KAFKA-6678: --- Which release would drop support for Java 7 ? thanks > Upgrade dependencies with later release versions > > > Key: KAFKA-6678 > URL: https://issues.apache.org/jira/browse/KAFKA-6678 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Ted Yu >Priority: Major > Attachments: k-update.txt > > > {code} > The following dependencies have later release versions: > - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1] > - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59] > - com.puppycrawl.tools:checkstyle [6.19 -> 8.8] > - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1] > - org.ajoberstar:grgit [1.9.3 -> 2.1.1] > - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26] > - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.openjdk.jmh:jmh-core [1.19 -> 1.20] > - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20] > - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20] > - org.lz4:lz4-java [1.4 -> 1.4.1] > - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3] > - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0] > - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0] > - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3] > - org.scala-lang:scala-library [2.11.12 -> 2.12.4] > - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0] > - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4] > - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5] > {code} > Looks like we can consider upgrading scalatest, jmh-core and checkstyle -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6058) KIP-222: Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434791#comment-16434791 ] ASF GitHub Bot commented on KAFKA-6058: --- guozhangwang opened a new pull request #4856: KAFKA-6058: Refactor consumer API result return types URL: https://github.com/apache/kafka/pull/4856 Refactored the return types in consumer group APIs the following way: ``` Map> DeleteConsumerGroupsResult#deletedGroups() Map> DescribeConsumerGroupsResult#describedGroups() KafkaFuture> ListConsumerGroupsResult#listings() KafkaFuture> ListConsumerGroupOffsetsResult#partitionsToOffsetAndMetadata() ``` 1. For DeleteConsumerGroupsResult and DescribeConsumerGroupsResult, for each group id we have two round-trips to get the coordinator, and then send the delete / describe request; I leave the potential optimization of batching requests for future work. 2. For ListConsumerGroupOffsetsResult, it is a simple single round-trip and hence the whole map is wrapped as a Future. 3. ListConsumerGroupsResult, it is the most tricky one: we would only know how many futures we should wait for after the first listNode returns, and hence I constructed the flattened future in the middle wrapped with the underlying map of futures. 3.a Another big change I made is, we do not return the exception in the flattened future if only a few of the nodes returns ERROR code, and instead just return the rest of the listings; to use that I added a new `anyOf` function in `KafkaFuture`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-222: Add "describe consumer groups" and "list consumer groups" to > KafkaAdminClient > -- > > Key: KAFKA-6058 > URL: https://issues.apache.org/jira/browse/KAFKA-6058 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Major > Labels: kip-222 > Fix For: 1.2.0 > > > {{KafkaAdminClient}} does not allow to get information about consumer groups. > This feature is supported by old {{kafka.admin.AdminClient}} though. > We should add {{KafkaAdminClient#describeConsumerGroups()}} and > {{KafkaAdminClient#listConsumerGroup()}}. > Associated KIP: KIP-222 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6773) Kafka consumer without group.id crashes when requesting offset on a topic-partition
[ https://issues.apache.org/jira/browse/KAFKA-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6773. Resolution: Fixed Fix Version/s: (was: 1.1.1) (was: 1.2.0) > Kafka consumer without group.id crashes when requesting offset on a > topic-partition > --- > > Key: KAFKA-6773 > URL: https://issues.apache.org/jira/browse/KAFKA-6773 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Konstantine Karantasis >Assignee: Jason Gustafson >Priority: Blocker > > After recent changes, when a KafkaConsumer requests the current offset on a > topic partition (e.g. via a call to {{position}}), the following exception is > thrown if the consumer doesn't belong to a consumer group ( {{group.id}} is > unset): > {code:java} > org.apache.kafka.common.KafkaException: Unexpected error in fetch offset > response: The configured groupId is invalid at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetFetchResponseHandler.handle(ConsumerCoordinator.java:835) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetFetchResponseHandler.handle(ConsumerCoordinator.java:818) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:822) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:802) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:292) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:469) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:446) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1788) > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1429) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6781) Transient failure in KafkaAdminClientTest.testDescribeConsumerGroupOffsets
Jason Gustafson created KAFKA-6781: -- Summary: Transient failure in KafkaAdminClientTest.testDescribeConsumerGroupOffsets Key: KAFKA-6781 URL: https://issues.apache.org/jira/browse/KAFKA-6781 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson {code} java.lang.AssertionError: expected:<10> but was:<0> Stacktrace java.lang.AssertionError: expected:<10> but was:<0> at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:645) at org.junit.Assert.assertEquals(Assert.java:631) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testDescribeConsumerGroupOffsets(KafkaAdminClientTest.java:794) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6781) Transient failure in KafkaAdminClientTest.testDescribeConsumerGroupOffsets
[ https://issues.apache.org/jira/browse/KAFKA-6781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434807#comment-16434807 ] Guozhang Wang commented on KAFKA-6781: -- This transient error is fixed along with https://github.com/apache/kafka/pull/4856/files > Transient failure in KafkaAdminClientTest.testDescribeConsumerGroupOffsets > -- > > Key: KAFKA-6781 > URL: https://issues.apache.org/jira/browse/KAFKA-6781 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > {code} > java.lang.AssertionError: expected:<10> but was:<0> > Stacktrace > java.lang.AssertionError: expected:<10> but was:<0> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > org.apache.kafka.clients.admin.KafkaAdminClientTest.testDescribeConsumerGroupOffsets(KafkaAdminClientTest.java:794) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at java.lang.Thread.run(Thread.java:745) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6775) AbstractProcessor created in SimpleBenchmark should call super#init
[ https://issues.apache.org/jira/browse/KAFKA-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434864#comment-16434864 ] Jimin Hsieh commented on KAFKA-6775: If I don't misunderstand, I found SmokeTestUtil#printProcessorSupplier has the same issue. > AbstractProcessor created in SimpleBenchmark should call super#init > --- > > Key: KAFKA-6775 > URL: https://issues.apache.org/jira/browse/KAFKA-6775 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Ted Yu >Priority: Minor > Labels: easy-fix, newbie > > Around line 610: > {code} > return new AbstractProcessor() { > @Override > public void init(ProcessorContext context) { > } > {code} > super.init should be called above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6775) AbstractProcessor created in SimpleBenchmark should call super#init
[ https://issues.apache.org/jira/browse/KAFKA-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jimin Hsieh reassigned KAFKA-6775: -- Assignee: Jimin Hsieh > AbstractProcessor created in SimpleBenchmark should call super#init > --- > > Key: KAFKA-6775 > URL: https://issues.apache.org/jira/browse/KAFKA-6775 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Ted Yu >Assignee: Jimin Hsieh >Priority: Minor > Labels: easy-fix, newbie > > Around line 610: > {code} > return new AbstractProcessor() { > @Override > public void init(ProcessorContext context) { > } > {code} > super.init should be called above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6705) producer.send() should not block due to metadata not available
[ https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434896#comment-16434896 ] Dong Lin commented on KAFKA-6705: - [~omkreddy] [~ijuma] Thanks for the reference to the previous discussion. It is very useful. After reading through the email previous discussion, it seems that the previous discussion is not against the proposal here, i.e. producer.send() does not need to block on metadata update. I have created KIP [https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update] and opened thread for discussion. Can you provide comments? Thanks! > producer.send() should not block due to metadata not available > -- > > Key: KAFKA-6705 > URL: https://issues.apache.org/jira/browse/KAFKA-6705 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently producer.send() may block on metadata for up to max.block.ms. This > behavior is well documented but it is a bit sub-optimal. Since we return a > future we should be able to make producer.send() completely non-blocking. One > idea is to simply insert the record into a global queue shared across all > partitions, and let the sender thread fetch record from this queue and send > to broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434927#comment-16434927 ] Veera commented on KAFKA-6052: -- Hi Jason / Vahid Can you please update about the release date of 1.1.1 or 1.2.0 ? Regards Veeru > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0, 1.0.1 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran >Assignee: Vahid Hashemian >Priority: Major > Labels: transactions, windows > Fix For: 1.2.0, 1.1.1 > > Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, > logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producer producer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done."); > Logger.log(this, "Committing..."); > producer.commitTransaction(); > Logger.log(this, "Committing done."); > } catch (ProducerFencedException | OutOfOrderSequenceException > | AuthorizationException e) { > producer.close(); > e.printStackTrace(); > } catch (KafkaException e) { > producer.abortTransaction(); > e.printStackTrace(); > } > producer.close(); > } > *In Consumer* > I have set isolation.level=read_committed > *In 3 Brokers* > I'm running with the following properties > Properties props = new Properties(); > props.setProperty("broker.id", "" + i); > props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); > props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + > i); > props.setProperty("num.partitions", "1"); > props.setProperty("zookeeper.connect", "localhost:2181"); > props.setProperty("zookeeper.connection.timeout.ms", "6000"); > props.setProperty("min.insync.replicas", "2"); > props.setProperty("offsets.topic.replication.factor", "2"); > props.setProperty("offsets.topic.num.partitions", "1"); > props.setProperty("transaction.state.log.num.partitions", "2"); > props.setProperty("transaction.state.log.replication.factor", "2"); > props.setProperty("transaction.state.log.min.isr", "2"); > I am not getting any records in the consumer. When I set > isolation.level=read_uncommitted, I get the records. I assume that the > records are not getting commited. What could be the problem? log attached -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6052) Windows: Consumers not polling when isolation.level=read_committed
[ https://issues.apache.org/jira/browse/KAFKA-6052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16434927#comment-16434927 ] Veera edited comment on KAFKA-6052 at 4/12/18 4:58 AM: --- Hi Jason / Vahid Can you please update about the release date of 1.1.1 or 1.2.0 ? Regards Veeru was (Author: vmallavarapu): Hi Jason / Vahid Can you please update about the release date of 1.1.1 or 1.2.0 ? Regards Veeru > Windows: Consumers not polling when isolation.level=read_committed > --- > > Key: KAFKA-6052 > URL: https://issues.apache.org/jira/browse/KAFKA-6052 > Project: Kafka > Issue Type: Bug > Components: consumer, producer >Affects Versions: 0.11.0.0, 1.0.1 > Environment: Windows 10. All processes running in embedded mode. >Reporter: Ansel Zandegran >Assignee: Vahid Hashemian >Priority: Major > Labels: transactions, windows > Fix For: 1.2.0, 1.1.1 > > Attachments: Prducer_Consumer.log, Separate_Logs.zip, kafka-logs.zip, > logFile.log > > > *The same code is running fine in Linux.* I am trying to send a transactional > record with exactly once schematics. These are my producer, consumer and > broker setups. > public void sendWithTTemp(String topic, EHEvent event) { > Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092,localhost:9093,localhost:9094"); > //props.put("bootstrap.servers", > "34.240.248.190:9092,52.50.95.30:9092,52.50.95.30:9092"); > props.put(ProducerConfig.ACKS_CONFIG, "all"); > props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); > props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); > props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); > props.put(ProducerConfig.LINGER_MS_CONFIG, 1); > props.put("transactional.id", "TID" + transactionId.incrementAndGet()); > props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "5000"); > Producer producer = > new KafkaProducer<>(props, > new StringSerializer(), > new StringSerializer()); > Logger.log(this, "Initializing transaction..."); > producer.initTransactions(); > Logger.log(this, "Initializing done."); > try { > Logger.log(this, "Begin transaction..."); > producer.beginTransaction(); > Logger.log(this, "Begin transaction done."); > Logger.log(this, "Sending events..."); > producer.send(new ProducerRecord<>(topic, > event.getKey().toString(), > event.getValue().toString())); > Logger.log(this, "Sending events done."); > Logger.log(this, "Committing..."); > producer.commitTransaction(); > Logger.log(this, "Committing done."); > } catch (ProducerFencedException | OutOfOrderSequenceException > | AuthorizationException e) { > producer.close(); > e.printStackTrace(); > } catch (KafkaException e) { > producer.abortTransaction(); > e.printStackTrace(); > } > producer.close(); > } > *In Consumer* > I have set isolation.level=read_committed > *In 3 Brokers* > I'm running with the following properties > Properties props = new Properties(); > props.setProperty("broker.id", "" + i); > props.setProperty("listeners", "PLAINTEXT://:909" + (2 + i)); > props.setProperty("log.dirs", Configuration.KAFKA_DATA_PATH + "\\B" + > i); > props.setProperty("num.partitions", "1"); > props.setProperty("zookeeper.connect", "localhost:2181"); > props.setProperty("zookeeper.connection.timeout.ms", "6000"); > props.setProperty("min.insync.replicas", "2"); > props.setProperty("offsets.topic.replication.factor", "2"); > props.setProperty("offsets.topic.num.partitions", "1"); > props.setProperty("transaction.state.log.num.partitions", "2"); > props.setProperty("transaction.state.log.replication.factor", "2"); > props.setProperty("transaction.state.log.min.isr", "2"); > I am not getting any records in the consumer. When I set > isolation.level=read_uncommitted, I get the records. I assume that the > records are not getting commited. What could be the problem? log attached -- This message was sent by Atlassian JIRA (v7.6.3#76005)