[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext
[ https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988579#comment-16988579 ] Bruno Cadonna commented on KAFKA-9088: -- I understand all your concerns and sometimes I also had hard times with EasyMock. However, I think the issues are only partly caused by EasyMock. The design of the production code may leak internals, the unit test may have issues because it tests too much or the wrong aspects, and EasyMock may be used wrongly. For instance, sometimes it does not make sense to specify how often a method on the mock is called or if it is called at all. That would be the case for the {{InternalProcessorContext}} mock and EasyMock offers functionality that allows to specify that. I would like to explore this part of EasyMock in this ticket. > Consolidate InternalMockProcessorContext and MockInternalProcessorContext > - > > Key: KAFKA-9088 > URL: https://issues.apache.org/jira/browse/KAFKA-9088 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Bruno Cadonna >Assignee: Pierangelo Di Pilato >Priority: Minor > Labels: newbie > > Currently, we have two mocks for the {{InternalProcessorContext}}. The goal > of this ticket is to merge both into one mock or replace it with an > {{EasyMock}} mock. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988623#comment-16988623 ] Francisco Juan commented on KAFKA-7447: --- [~timvanlaer] we are seeing the same error that you saw with the message "Error loading offsets" message from the GroupMetadataManager class. We have two questions: 1. Were you able to reproduce the error? If so, could you give us a hint? 2. Did updating to version 2.2.2 fix the issue? Our setup looks like this: Kafka version: 2.2.1 Number of brokers: 30 Number of leader partitions: 15785 Number of consumer-groups: 1150 inter.broker.protocol.version=1.1 min.insync.replicas=2 > Consumer offsets lost during leadership rebalance after bringing node back > from clean shutdown > -- > > Key: KAFKA-7447 > URL: https://issues.apache.org/jira/browse/KAFKA-7447 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ben Isaacs >Priority: Major > > *Summary:* > * When 1 of my 3 brokers is cleanly shut down, consumption and production > continues as normal due to replication. (Consumers are rebalanced to the > replicas, and producers are rebalanced to the remaining brokers). However, > when the cleanly-shut-down broker comes back, after about 10 minutes, a > flurry of production errors occur and my consumers suddenly go back in time 2 > weeks, causing a long outage (12 hours+) as all messages are replayed on some > topics. > * The hypothesis is that the auto-leadership-rebalance is happening too > quickly after the downed broker returns, before it has had a chance to become > fully synchronised on all partitions. In particular, it seems that having > consumer offets ahead of the most recent data on the topic that consumer was > following causes the consumer to be reset to 0. > *Expected:* > * bringing a node back from a clean shut down does not cause any consumers > to reset to 0. > *Actual:* > * I experience approximately 12 hours of partial outage triggered at the > point that auto leadership rebalance occurs, after a cleanly shut down node > returns. > *Workaround:* > * disable auto leadership rebalance entirely. > * manually rebalance it from time to time when all nodes and all partitions > are fully replicated. > *My Setup:* > * Kafka deployment with 3 brokers and 2 topics. > * Replication factor is 3, for all topics. > * min.isr is 2, for all topics. > * Zookeeper deployment with 3 instances. > * In the region of 10 to 15 consumers, with 2 user topics (and, of course, > the system topics such as consumer offsets). Consumer offsets has the > standard 50 partitions. The user topics have about 3000 partitions in total. > * Offset retention time of 7 days, and topic retention time of 14 days. > * Input rate ~1000 messages/sec. > * Deployment happens to be on Google compute engine. > *Related Stack Overflow Post:* > https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker > It was suggested I open a ticket by "Muir" who says he they have also > experienced this. > *Transcription of logs, showing the problem:* > Below, you can see chronologically sorted, interleaved, logs from the 3 > brokers. prod-kafka-2 is the node which was cleanly shut down and then > restarted. I filtered the messages only to those regardling > __consumer_offsets-29 because it's just too much to paste, otherwise. > ||Broker host||Broker ID|| > |prod-kafka-1|0| > |prod-kafka-2|1 (this one was restarted)| > |prod-kafka-3|2| > prod-kafka-2: (just starting up) > {code} > [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: (sees replica1 come back) > {code} > [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] > Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition) > {code} > prod-kafka-2: > {code} > [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broke
[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988629#comment-16988629 ] Karolis Pocius commented on KAFKA-7447: --- [~francisco.juan], we went straight for 2.3.1, but seeing as the fix for KAFKA-8896 is included in both 2.2.2 and 2.3.1, I'd image upgrading should fix your issue. > Consumer offsets lost during leadership rebalance after bringing node back > from clean shutdown > -- > > Key: KAFKA-7447 > URL: https://issues.apache.org/jira/browse/KAFKA-7447 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ben Isaacs >Priority: Major > > *Summary:* > * When 1 of my 3 brokers is cleanly shut down, consumption and production > continues as normal due to replication. (Consumers are rebalanced to the > replicas, and producers are rebalanced to the remaining brokers). However, > when the cleanly-shut-down broker comes back, after about 10 minutes, a > flurry of production errors occur and my consumers suddenly go back in time 2 > weeks, causing a long outage (12 hours+) as all messages are replayed on some > topics. > * The hypothesis is that the auto-leadership-rebalance is happening too > quickly after the downed broker returns, before it has had a chance to become > fully synchronised on all partitions. In particular, it seems that having > consumer offets ahead of the most recent data on the topic that consumer was > following causes the consumer to be reset to 0. > *Expected:* > * bringing a node back from a clean shut down does not cause any consumers > to reset to 0. > *Actual:* > * I experience approximately 12 hours of partial outage triggered at the > point that auto leadership rebalance occurs, after a cleanly shut down node > returns. > *Workaround:* > * disable auto leadership rebalance entirely. > * manually rebalance it from time to time when all nodes and all partitions > are fully replicated. > *My Setup:* > * Kafka deployment with 3 brokers and 2 topics. > * Replication factor is 3, for all topics. > * min.isr is 2, for all topics. > * Zookeeper deployment with 3 instances. > * In the region of 10 to 15 consumers, with 2 user topics (and, of course, > the system topics such as consumer offsets). Consumer offsets has the > standard 50 partitions. The user topics have about 3000 partitions in total. > * Offset retention time of 7 days, and topic retention time of 14 days. > * Input rate ~1000 messages/sec. > * Deployment happens to be on Google compute engine. > *Related Stack Overflow Post:* > https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker > It was suggested I open a ticket by "Muir" who says he they have also > experienced this. > *Transcription of logs, showing the problem:* > Below, you can see chronologically sorted, interleaved, logs from the 3 > brokers. prod-kafka-2 is the node which was cleanly shut down and then > restarted. I filtered the messages only to those regardling > __consumer_offsets-29 because it's just too much to paste, otherwise. > ||Broker host||Broker ID|| > |prod-kafka-1|0| > |prod-kafka-2|1 (this one was restarted)| > |prod-kafka-3|2| > prod-kafka-2: (just starting up) > {code} > [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: (sees replica1 come back) > {code} > [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] > Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition) > {code} > prod-kafka-2: > {code} > [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] > __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous > Leader Epoch was: 77 (kafka.cluster.Partition) > [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling > loading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataM
[jira] [Commented] (KAFKA-9219) NullPointerException when polling metrics from Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988631#comment-16988631 ] Mickael Maison commented on KAFKA-9219: --- Great! I wanted to backport it to 2.4 but was waiting for 2.4.0 to release. Thanks for letting me know > NullPointerException when polling metrics from Kafka Connect > > > Key: KAFKA-9219 > URL: https://issues.apache.org/jira/browse/KAFKA-9219 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 2.4.0, 2.5.0 > > > The following stack trace appears: > > {code:java} > [2019-11-05 23:56:57,909] WARN Error getting JMX attribute 'assigned-tasks' > (org.apache.kafka.common.metrics.JmxReporter:202) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.distributed.WorkerCoordinator$WorkerCoordinatorMetrics$2.measure(WorkerCoordinator.java:316) > at > org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:66) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:190) > at > org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:200) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705) > at > javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1449) > at > javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76) > at > javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309) > at > javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401) > at > javax.management.remote.rmi.RMIConnectionImpl.getAttributes(RMIConnectionImpl.java:675) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357) > at sun.rmi.transport.Transport$1.run(Transport.java:200) > at sun.rmi.transport.Transport$1.run(Transport.java:197) > at java.security.AccessController.doPrivileged(Native Method) > at sun.rmi.transport.Transport.serviceCall(Transport.java:196) > at > sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:835) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688) > at java.security.AccessController.doPrivileged(Native Method) > at > sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-1, > groupId=backup-mm2] Herder stopped > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629) > [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-2, groupId=cv-mm2] > Herder stopping > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:609) > [2019-11-05 23:57:07,822] INFO [Worker clientId=connect-2, groupId=cv-mm2] > Herder stopped > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629) > [2019-11-05 23:57:07,822] INFO Kafka MirrorMaker stopped. > (org.apache.kafka.connect.mirror.MirrorMaker:191) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8896) NoSuchElementException after coordinator move
[ https://issues.apache.org/jira/browse/KAFKA-8896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988654#comment-16988654 ] Francisco Juan commented on KAFKA-8896: --- Hello [~hachikuji], could you please provide the steps or scenario that triggers this error? We have a cluster running version 2.2.1 and it is throwing this same error +sometimes+ when there's a broker restart. We can correlate the error that you saw with an earlier error looking like "ERROR [GroupMetadataManager brokerId=2] Error loading offsets from __consumer_offsets". This happen short after a broker restarts. We can't reproduce this error in a test environment, which we would like to do to verify if an upgrade would actually fix our issue (some consumer-groups losing their offsets) Our cluster setup looks like this: {code:java} Kafka version: 2.2.1 Number of brokers: 30 Number of leader partitions: 15785 Number of consumer-groups: 1150 inter.broker.protocol.version=1.1 min.insync.replicas=2{code} Errors stack trace detail: {code:java} [2019-11-28 08:13:22,603] ERROR [KafkaApi-42] Error when handling request: clientId=enrichment-worker-kafka, correlationId=92, api=HEARTBEAT, body={group_id=enrichment-worker-importio-webhook-consumer-eu,generation_id=9877,member_id=enrichment-worker-kafka-25821b62-f36b-4e64-905b-92019e4a5493} (kafka.server.KafkaApis) java.util.NoSuchElementException: key not found: consumer-name-25821b62-f36b-4e64-905b-92019e4a5493 at scala.collection.MapLike.default(MapLike.scala:235) at scala.collection.MapLike.default$(MapLike.scala:234) at scala.collection.AbstractMap.default(Map.scala:63) at scala.collection.mutable.HashMap.apply(HashMap.scala:69) at kafka.coordinator.group.GroupMetadata.get(GroupMetadata.scala:203) at kafka.coordinator.group.GroupCoordinator.$anonfun$tryCompleteHeartbeat$1(GroupCoordinator.scala:927) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at kafka.coordinator.group.GroupCoordinator.tryCompleteHeartbeat(GroupCoordinator.scala:920) at kafka.coordinator.group.DelayedHeartbeat.tryComplete(DelayedHeartbeat.scala:34) at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:388) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:294) at kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextExpiration(GroupCoordinator.scala:737) at kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextHeartbeatExpiration(GroupCoordinator.scala:730) at kafka.coordinator.group.GroupCoordinator.$anonfun$handleHeartbeat$2(GroupCoordinator.scala:486) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at kafka.coordinator.group.GroupCoordinator.handleHeartbeat(GroupCoordinator.scala:451) at kafka.server.KafkaApis.handleHeartbeatRequest(KafkaApis.scala:1336) at kafka.server.KafkaApis.handle(KafkaApis.scala:120) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at java.lang.Thread.run(Thread.java:748) [2019-11-28 08:13:18,175] ERROR [GroupMetadataManager brokerId=42] Error loading offsets from __consumer_offsets-24 (kafka.coordinator.group.GroupMetadataManager) java.util.NoSuchElementException: key not found: consumer-name-de868651-3166-46df-98c5-6196b9ade526 at scala.collection.MapLike.default(MapLike.scala:235) at scala.collection.MapLike.default(MapLike.scala:235) at scala.collection.MapLike.default$(MapLike.scala:234) at scala.collection.AbstractMap.default(Map.scala:63) at scala.collection.mutable.HashMap.apply(HashMap.scala:69) at kafka.coordinator.group.GroupMetadata.get(GroupMetadata.scala:203) at kafka.coordinator.group.GroupCoordinator.$anonfun$tryCompleteHeartbeat$1(GroupCoordinator.scala:927) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at kafka.coordinator.group.GroupCoordinator.tryCompleteHeartbeat(GroupCoordinator.scala:920) at kafka.coordinator.group.DelayedHeartbeat.tryComplete(DelayedHeartbeat.scala:34) at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:388) at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:294) at kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextExpiration(GroupCoordinator.scala:737) at kafka.coordinator.group.GroupCoordina
[jira] [Created] (KAFKA-9271) Client hangs until timeout when SSL handshake fails
Richard Wise created KAFKA-9271: --- Summary: Client hangs until timeout when SSL handshake fails Key: KAFKA-9271 URL: https://issues.apache.org/jira/browse/KAFKA-9271 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.0.1 Reporter: Richard Wise I accidentally forgot to set `security.protocol` to `SSL`, which meant that the Kafka client tried connecting to an SSL port via PLAINTEXT. I would expect this to immediate fail on the client side with an obvious message that the security settings do not match the server. However, instead the client continually retries (I can see the SSL handshake failure messages in the server logs) before timing out on querying metadata. This behaviour is confusing and leads the developer to think that there is an issue of latency or load and that they need to increase hardware or timeouts. In addition, the continual retries (without any backoff) cause significant CPU load on both the client and server that can cause additional issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9272) Error when trying to connect to SSL port using PLAINTEXT is very confusing
Richard Wise created KAFKA-9272: --- Summary: Error when trying to connect to SSL port using PLAINTEXT is very confusing Key: KAFKA-9272 URL: https://issues.apache.org/jira/browse/KAFKA-9272 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.2.1 Reporter: Richard Wise I accidentally forgot to set `security.protocol` to SSL when connecting to an SSL port, so connected via PLAINTEXT. I expected the Kafka client to fail immediately with a message about mismatched security protocols, but instead it failed saying that the topic does not exist and auto-creation was not enabled. This error message is very misleading and leads developers to think that the problem is the server configuration, not the client network security settings. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988680#comment-16988680 ] Francisco Juan commented on KAFKA-7447: --- Thanks [~Karolis]. Would it be possible for you to describe an scenario where this issue is reproduced? I'd really like to confirm that updating the cluster version would fix this. Best regards > Consumer offsets lost during leadership rebalance after bringing node back > from clean shutdown > -- > > Key: KAFKA-7447 > URL: https://issues.apache.org/jira/browse/KAFKA-7447 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ben Isaacs >Priority: Major > > *Summary:* > * When 1 of my 3 brokers is cleanly shut down, consumption and production > continues as normal due to replication. (Consumers are rebalanced to the > replicas, and producers are rebalanced to the remaining brokers). However, > when the cleanly-shut-down broker comes back, after about 10 minutes, a > flurry of production errors occur and my consumers suddenly go back in time 2 > weeks, causing a long outage (12 hours+) as all messages are replayed on some > topics. > * The hypothesis is that the auto-leadership-rebalance is happening too > quickly after the downed broker returns, before it has had a chance to become > fully synchronised on all partitions. In particular, it seems that having > consumer offets ahead of the most recent data on the topic that consumer was > following causes the consumer to be reset to 0. > *Expected:* > * bringing a node back from a clean shut down does not cause any consumers > to reset to 0. > *Actual:* > * I experience approximately 12 hours of partial outage triggered at the > point that auto leadership rebalance occurs, after a cleanly shut down node > returns. > *Workaround:* > * disable auto leadership rebalance entirely. > * manually rebalance it from time to time when all nodes and all partitions > are fully replicated. > *My Setup:* > * Kafka deployment with 3 brokers and 2 topics. > * Replication factor is 3, for all topics. > * min.isr is 2, for all topics. > * Zookeeper deployment with 3 instances. > * In the region of 10 to 15 consumers, with 2 user topics (and, of course, > the system topics such as consumer offsets). Consumer offsets has the > standard 50 partitions. The user topics have about 3000 partitions in total. > * Offset retention time of 7 days, and topic retention time of 14 days. > * Input rate ~1000 messages/sec. > * Deployment happens to be on Google compute engine. > *Related Stack Overflow Post:* > https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker > It was suggested I open a ticket by "Muir" who says he they have also > experienced this. > *Transcription of logs, showing the problem:* > Below, you can see chronologically sorted, interleaved, logs from the 3 > brokers. prod-kafka-2 is the node which was cleanly shut down and then > restarted. I filtered the messages only to those regardling > __consumer_offsets-29 because it's just too much to paste, otherwise. > ||Broker host||Broker ID|| > |prod-kafka-1|0| > |prod-kafka-2|1 (this one was restarted)| > |prod-kafka-3|2| > prod-kafka-2: (just starting up) > {code} > [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: (sees replica1 come back) > {code} > [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] > Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition) > {code} > prod-kafka-2: > {code} > [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] > __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous > Leader Epoch was: 77 (kafka.cluster.Partition) > [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling > loading of offsets and group metadata from __consumer_offsets-29 > (kafka.
[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988699#comment-16988699 ] Karolis Pocius commented on KAFKA-7447: --- We've encountered the issue not after a clean shutdown, as described originally, but when one of the brokers lost zookeeper connection and then reconnected. So to test the fix I was simply closing zookeeper port using iptables on one of the brokers for a few seconds and reopening it. > Consumer offsets lost during leadership rebalance after bringing node back > from clean shutdown > -- > > Key: KAFKA-7447 > URL: https://issues.apache.org/jira/browse/KAFKA-7447 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.0 >Reporter: Ben Isaacs >Priority: Major > > *Summary:* > * When 1 of my 3 brokers is cleanly shut down, consumption and production > continues as normal due to replication. (Consumers are rebalanced to the > replicas, and producers are rebalanced to the remaining brokers). However, > when the cleanly-shut-down broker comes back, after about 10 minutes, a > flurry of production errors occur and my consumers suddenly go back in time 2 > weeks, causing a long outage (12 hours+) as all messages are replayed on some > topics. > * The hypothesis is that the auto-leadership-rebalance is happening too > quickly after the downed broker returns, before it has had a chance to become > fully synchronised on all partitions. In particular, it seems that having > consumer offets ahead of the most recent data on the topic that consumer was > following causes the consumer to be reset to 0. > *Expected:* > * bringing a node back from a clean shut down does not cause any consumers > to reset to 0. > *Actual:* > * I experience approximately 12 hours of partial outage triggered at the > point that auto leadership rebalance occurs, after a cleanly shut down node > returns. > *Workaround:* > * disable auto leadership rebalance entirely. > * manually rebalance it from time to time when all nodes and all partitions > are fully replicated. > *My Setup:* > * Kafka deployment with 3 brokers and 2 topics. > * Replication factor is 3, for all topics. > * min.isr is 2, for all topics. > * Zookeeper deployment with 3 instances. > * In the region of 10 to 15 consumers, with 2 user topics (and, of course, > the system topics such as consumer offsets). Consumer offsets has the > standard 50 partitions. The user topics have about 3000 partitions in total. > * Offset retention time of 7 days, and topic retention time of 14 days. > * Input rate ~1000 messages/sec. > * Deployment happens to be on Google compute engine. > *Related Stack Overflow Post:* > https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker > It was suggested I open a ticket by "Muir" who says he they have also > experienced this. > *Transcription of logs, showing the problem:* > Below, you can see chronologically sorted, interleaved, logs from the 3 > brokers. prod-kafka-2 is the node which was cleanly shut down and then > restarted. I filtered the messages only to those regardling > __consumer_offsets-29 because it's just too much to paste, otherwise. > ||Broker host||Broker ID|| > |prod-kafka-1|0| > |prod-kafka-2|1 (this one was restarted)| > |prod-kafka-3|2| > prod-kafka-2: (just starting up) > {code} > [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Based on follower's leader epoch, leader replied with an unknown > offset in __consumer_offsets-29. The initial fetch offset 0 will be used for > truncation. (kafka.server.ReplicaFetcherThread) > {code} > prod-kafka-3: (sees replica1 come back) > {code} > [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] > Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition) > {code} > prod-kafka-2: > {code} > [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling > unloading of offsets and group metadata from __consumer_offsets-29 > (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished > unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached > groups. (kafka.coordinator.group.GroupMetadataManager) > [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed > fetcher for partitions __consumer_offsets-29 > (kafka.server.ReplicaFetcherManager) > [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] > __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous > Leader Epoch was: 77 (kafka.cluster.Partition) > [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1
[jira] [Commented] (KAFKA-9250) Kafka streams stuck in rebalancing state after UnknownHostException
[ https://issues.apache.org/jira/browse/KAFKA-9250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988732#comment-16988732 ] Vijay commented on KAFKA-9250: -- Thanks [~guozhang], we are planning to update kafka clients dependency to latest one (2.3.1) available as there are some more defects fixed from 2.2.0 and will update the results here. > Kafka streams stuck in rebalancing state after UnknownHostException > --- > > Key: KAFKA-9250 > URL: https://issues.apache.org/jira/browse/KAFKA-9250 > Project: Kafka > Issue Type: Bug > Components: clients, config, network, streams >Affects Versions: 2.2.0 >Reporter: Vijay >Priority: Critical > > We are using kafka streams (2.2.0) application for reading messages from > source topic do some transformation and send to destination topic. > Application started fine and processed messages till it encountered > UnknownHostException, after which application is hung in rebalancing state > and not processing messages. > > Below are the properties we have configured : > application.id = * > bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3" > num.stream.threads=3 > replication.factor=3 > num.standby.replicas=1 > max.block.ms=8640 > acks=all > auto.offset.reset=earliest > processing.guarantee=exactly_once > > Additional details. > Number of brokers - 3 > Source topic partition count - 12 and replication factor of 3 > Destination topic partition count - 12 and replication factor of 3 > 4 instances of stream application are deployed in docker containers. > > Below are the some of the logs : > {noformat} > [WARN] [streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1] > o.a.k.clients.NetworkClient - [Consumer > clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer, > groupId=streams-example] Error connecting to node hostname1:port1 (id: > 2147438464 rack: null) > java.net.UnknownHostException: hostname1 > at java.net.InetAddress.getAllByName0(InetAddress.java:1280) > at java.net.InetAddress.getAllByName(InetAddress.java:1192) > at java.net.InetAddress.getAllByName(InetAddress.java:1126) > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387) > > at > org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121) > > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917) > > at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235) > > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:850) > > at > org.apache.kafka.streams.proc
[jira] [Commented] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker
[ https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988777#comment-16988777 ] Oleg Muravskiy commented on KAFKA-9173: --- I understand that this is how it is *implemented*, but it is not how it is *documented*, or at least I don't see it anywhere in the Streams documentation, apart from the description of the (default and only one) {{DefaultPartitionGrouper}}. Is there a reason for such behaviour? Could I just implement an alternative {{PartitionGrouper}} that will assign each partition to a new task? > StreamsPartitionAssignor assigns partitions to only one worker > -- > > Key: KAFKA-9173 > URL: https://issues.apache.org/jira/browse/KAFKA-9173 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0, 2.2.1 >Reporter: Oleg Muravskiy >Priority: Major > Labels: user-experience > Attachments: StreamsPartitionAssignor.log > > > I'm running a distributed KafkaStreams application on 10 worker nodes, > subscribed to 21 topics with 10 partitions in each. I'm only using a > Processor interface, and a persistent state store. > However, only one worker gets assigned partitions, all other workers get > nothing. Restarting the application, or cleaning local state stores does not > help. StreamsPartitionAssignor migrates to other nodes, and eventually picks > up other node to assign partitions to, but still only one node. > It's difficult to figure out where to look for the signs of problems, I'm > attaching the log messages from the StreamsPartitionAssignor. Let me know > what else I could provide to help resolve this. > [^StreamsPartitionAssignor.log] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-9261) NPE when updating client metadata
[ https://issues.apache.org/jira/browse/KAFKA-9261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar reopened KAFKA-9261: -- > NPE when updating client metadata > - > > Key: KAFKA-9261 > URL: https://issues.apache.org/jira/browse/KAFKA-9261 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.4.0, 2.3.2 > > > We have seen the following exception recently: > {code} > java.lang.NullPointerException > at java.base/java.util.Objects.requireNonNull(Objects.java:221) > at org.apache.kafka.common.Cluster.(Cluster.java:134) > at org.apache.kafka.common.Cluster.(Cluster.java:89) > at > org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:120) > at org.apache.kafka.clients.MetadataCache.(MetadataCache.java:82) > at org.apache.kafka.clients.MetadataCache.(MetadataCache.java:58) > at > org.apache.kafka.clients.Metadata.handleMetadataResponse(Metadata.java:325) > at org.apache.kafka.clients.Metadata.update(Metadata.java:252) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1059) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:845) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:548) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > {code} > The client assumes that if a leader is included in the response, then node > information must also be available. There are at least a couple possible > reasons this assumption can fail: > 1. The client is able to detect stale partition metadata using leader epoch > information available. If stale partition metadata is detected, the client > ignores it and uses the last known metadata. However, it cannot detect stale > broker information and will always accept the latest update. This means that > the latest metadata may be a mix of multiple metadata responses and therefore > the invariant will not generally hold. > 2. There is no lock which protects both the fetching of partition metadata > and the live broker when handling a Metadata request. This means an > UpdateMetadata request can arrive concurrently and break the intended > invariant. > It seems case 2 has been possible for a long time, but it should be extremely > rare. Case 1 was only made possible with KIP-320, which added the leader > epoch tracking. It should also be rare, but the window for inconsistent > metadata is probably a bit bigger than the window for a concurrent update. > To fix this, we should make the client more defensive about metadata updates > and not assume that the leader is among the live endpoints. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext
[ https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988913#comment-16988913 ] John Roesler commented on KAFKA-9088: - Thanks, Bruno, I don’t want you to feel besieged. It seems like this conversation has yielded a good list of common pitfalls with EasyMock, but at the end of the day the only way to know for sure is to try to make it work. It should be obvious from the diff whether it really makes things easier or not. As long as it’s approached in the spirit of a prototype, it shouldn’t be too frustrating to just take another stab at the implementation. Just one specific thought... It may be possible that some tests don’t need a rich interaction with the context. In those cases, maybe they can just create a mock inline. This might free up some of the constraints to simplify MockInternalProcessorContext for use with just the tests that need a more complex interaction. I.e., sometimes trying for a general drop-in replacement isn’t necessary, and just complicates everything. > Consolidate InternalMockProcessorContext and MockInternalProcessorContext > - > > Key: KAFKA-9088 > URL: https://issues.apache.org/jira/browse/KAFKA-9088 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Bruno Cadonna >Assignee: Pierangelo Di Pilato >Priority: Minor > Labels: newbie > > Currently, we have two mocks for the {{InternalProcessorContext}}. The goal > of this ticket is to merge both into one mock or replace it with an > {{EasyMock}} mock. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure
[ https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988923#comment-16988923 ] John Roesler commented on KAFKA-9270: - Hey Rohan, There is a Consumer config to control how long it will wait for an ack on commit. You can just increase this config from the default of 60 seconds by passing it in as part of your Streams configs. > KafkaStream crash on offset commit failure > -- > > Key: KAFKA-9270 > URL: https://issues.apache.org/jira/browse/KAFKA-9270 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Rohan Kulkarni >Priority: Critical > > On our Production server we intermittently observe Kafka Streams get crashed > with TimeoutException while committing offset. The only workaround seems to > be restarting the application which is not a desirable solution for a > production environment. > > While have already implemented ProductionExceptionHandler which does not > seems to address this. > > Please provide a fix for this or a viable workaround. > > +Application side logs:+ > 2019-11-17 08:28:48.055 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373] > - stream-thread > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to > commit stream task 0_1 due to the following error:* > *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}} > > 2019-11-17 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 > 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager > MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: > HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions > = [], controller = null) Active tasks: Running: Suspended: Restoring: New: > Standby tasks: Running: Suspended: Restoring: New: > org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}} > > +Kafka broker logs:+ > [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f* > (org.apache.zookeeper.ClientCnxn) > [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > > Regards, > Rohan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes
Bill Bejeck created KAFKA-9273: -- Summary: Refactor AbstractJoinIntegrationTest and Sub-classes Key: KAFKA-9273 URL: https://issues.apache.org/jira/browse/KAFKA-9273 Project: Kafka Issue Type: Test Components: streams Reporter: Bill Bejeck Assignee: Bill Bejeck The AbstractJoinIntegrationTest uses an embedded broker, but not all the sub-classes require the use of an embedded broker anymore. Additionally, there are two test remaining that require an embedded broker, but they don't perform joins, the are tests validating other conditions, so ideally those tests should move into a separate test -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9268) Follow-on: Streams Threads may die from recoverable errors with EOS enabled
[ https://issues.apache.org/jira/browse/KAFKA-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988953#comment-16988953 ] John Roesler commented on KAFKA-9268: - The only compatibility concern I'm aware of is that 2.2.1+ requires broker message format 0.11+ . But there might be other constraints keeping people on older versions of Streams, the biggest one being a desire to use more fully-baked versions, rather than bleeding-edge ones. Still, this might be the kind of problem you can live with, unless your environment is _extremely_ flaky. I created this ticket partly just to document that we're aware of the flaw, which hopefully would be helpful to a person trying to get to the bottom of why their threads are dying. Also, it captures valuable (and hard-won) context in case someone does want to pick this up and fix it for older versions. > Follow-on: Streams Threads may die from recoverable errors with EOS enabled > --- > > Key: KAFKA-9268 > URL: https://issues.apache.org/jira/browse/KAFKA-9268 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 >Reporter: John Roesler >Priority: Major > Attachments: 2.2-eos-failures-1.txt, 2.2-eos-failures-2.txt > > > While testing Streams in EOS mode under frequent and heavy network > partitions, I've encountered exceptions leading to thread death in both 2.2 > and 2.3 (although different exceptions). > I believe this problem is addressed in 2.4+ by > https://issues.apache.org/jira/browse/KAFKA-9231 , however, if you look at > the ticket and corresponding PR, you will see that the solution there > introduced some tech debt around UnknownProducerId that needs to be cleaned > up. Therefore, I'm not backporting that fix to older branches. Rather, I'm > opening a new ticket to make more conservative changes in older branches to > improve resilience, if desired. > These failures are relative rare, so I don't think that a system or > integration test could reliably reproduce it. The steps to reproduce would be: > 1. set up a long-running Streams application with EOS enabled (I used three > Streams instances) > 2. inject periodic network partitions (I had each Streams instance schedule > an interruption at a random time between 0 and 3 hours, then schedule the > interruption to last a random duration between 0 and 5 minutes. The > interruptions are accomplished by using iptables to drop all traffic to/from > all three brokers) > As far as the actual errors I've observed, I'm attaching the logs of two > incidents in which a thread was caused to shut down. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure
[ https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988991#comment-16988991 ] Boyang Chen commented on KAFKA-9270: [~vvcephei] In fact timeout exceptions is very common thing, so we probably want to discuss whether we should fail the thread immediately when there is a timeout, for example https://issues.apache.org/jira/browse/KAFKA-8803 has a similar problem with initialization. > KafkaStream crash on offset commit failure > -- > > Key: KAFKA-9270 > URL: https://issues.apache.org/jira/browse/KAFKA-9270 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Rohan Kulkarni >Priority: Critical > > On our Production server we intermittently observe Kafka Streams get crashed > with TimeoutException while committing offset. The only workaround seems to > be restarting the application which is not a desirable solution for a > production environment. > > While have already implemented ProductionExceptionHandler which does not > seems to address this. > > Please provide a fix for this or a viable workaround. > > +Application side logs:+ > 2019-11-17 08:28:48.055 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373] > - stream-thread > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to > commit stream task 0_1 due to the following error:* > *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}} > > 2019-11-17 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 > 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager > MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: > HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions > = [], controller = null) Active tasks: Running: Suspended: Restoring: New: > Standby tasks: Running: Suspended: Restoring: New: > org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}} > > +Kafka broker logs:+ > [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f* > (org.apache.zookeeper.ClientCnxn) > [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > > Regards, > Rohan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker
[ https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989070#comment-16989070 ] Sophie Blee-Goldman commented on KAFKA-9173: That's a good point, it does need to be documented. We should consider making this more flexible, either by making this user-customizable (for example by adding a `separateNodeGroups` flag to the StreamBuilder.stream(Pattern) overload or maybe by autoscaling according to some heuristic. As far as workarounds for now, I wouldn't necessarily recommend implementing a custom PartitionGrouper since that's been deprecated in 2.4. You could always use normal topic subscription to source each individually and then do something like for (KStream stream : inputTopics) { _processingTopology(stream)_ } > StreamsPartitionAssignor assigns partitions to only one worker > -- > > Key: KAFKA-9173 > URL: https://issues.apache.org/jira/browse/KAFKA-9173 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0, 2.2.1 >Reporter: Oleg Muravskiy >Priority: Major > Labels: user-experience > Attachments: StreamsPartitionAssignor.log > > > I'm running a distributed KafkaStreams application on 10 worker nodes, > subscribed to 21 topics with 10 partitions in each. I'm only using a > Processor interface, and a persistent state store. > However, only one worker gets assigned partitions, all other workers get > nothing. Restarting the application, or cleaning local state stores does not > help. StreamsPartitionAssignor migrates to other nodes, and eventually picks > up other node to assign partitions to, but still only one node. > It's difficult to figure out where to look for the signs of problems, I'm > attaching the log messages from the StreamsPartitionAssignor. Let me know > what else I could provide to help resolve this. > [^StreamsPartitionAssignor.log] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure
[ https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989123#comment-16989123 ] John Roesler commented on KAFKA-9270: - Agreed, [~bchen225242], and I'm actually on board with your line of thinking, but I wanted to make sure that [~rohan26may] gets an actionable answer to his question, which is that he needs to increase the timeout configuration. Can you perhaps create a new ticket, or a thread in the dev mailing list to discuss changing Streams's general approach for handling transient client failures? > KafkaStream crash on offset commit failure > -- > > Key: KAFKA-9270 > URL: https://issues.apache.org/jira/browse/KAFKA-9270 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Rohan Kulkarni >Priority: Critical > > On our Production server we intermittently observe Kafka Streams get crashed > with TimeoutException while committing offset. The only workaround seems to > be restarting the application which is not a desirable solution for a > production environment. > > While have already implemented ProductionExceptionHandler which does not > seems to address this. > > Please provide a fix for this or a viable workaround. > > +Application side logs:+ > 2019-11-17 08:28:48.055 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373] > - stream-thread > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to > commit stream task 0_1 due to the following error:* > *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}} > > 2019-11-17 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 > 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager > MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: > HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions > = [], controller = null) Active tasks: Running: Suspended: Restoring: New: > Standby tasks: Running: Suspended: Restoring: New: > org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}} > > +Kafka broker logs:+ > [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f* > (org.apache.zookeeper.ClientCnxn) > [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > > Regards, > Rohan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9131) failed producer metadata updates result in the unrelated error message
[ https://issues.apache.org/jira/browse/KAFKA-9131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989159#comment-16989159 ] ASF GitHub Bot commented on KAFKA-9131: --- guozhangwang commented on pull request #7635: KAFKA-9131: Remove dead code for handling timeout exception URL: https://github.com/apache/kafka/pull/7635 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > failed producer metadata updates result in the unrelated error message > -- > > Key: KAFKA-9131 > URL: https://issues.apache.org/jira/browse/KAFKA-9131 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Gleb Komissarov >Assignee: Gleb Komissarov >Priority: Major > > {{Producer Metadata TimeoutException}} is processed as a generic > RetriableException in RecordCollectorImpl.sendError. This results in an > irrelevant error message. > We were supposed to see this > "Timeout exception caught when sending record to topic %s. " + > "This might happen if the producer cannot send data to the Kafka cluster and > thus, " + > "its internal buffer fills up. " + > "This can also happen if the broker is slow to respond, if the network > connection to " + > "the broker was interrupted, or if similar circumstances arise. " + > "You can increase producer parameter `max.block.ms` to increase this > timeout." > but got this: > "You can increase the producer configs `delivery.timeout.ms` and/or " + > "`retries` to avoid this error. Note that `retries` is set to infinite by > default." > These params are not applicable to metadata updates. > Technical details: > (1) Lines 221 - 236 in > kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java > are dead code. They are never executed because {{producer.send}} never throws > TimeoutException, but returns a failed future. You can see it in lines > 948-955 in > kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > (2) The exception is then processed in a callback function in the method > {{recordSendError}} on line 202. The DefaultProductionExceptionHandler is > used. > (3) in {{recordSendError}} in the same class the timeout exception is > processed as RetriableException at lines 133-136. The error message is simply > wrong because tweaking {{[delivery.timeout.ms|http://delivery.timeout.ms/]}} > and {{retries}} has nothing to do with the issue in this case. > Proposed solution: > (1) Remove unreachable catch (final TimeoutException e) in > RecordCollectorImpl.java as Producer does not throw ApiExceptions. > (2) Move the aforementioned catch clause to recordSendError method. > (3) Process TimeoutException separately from RetiriableException. > (4) Implement a unit test to cover this corner case > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9131) failed producer metadata updates result in the unrelated error message
[ https://issues.apache.org/jira/browse/KAFKA-9131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989161#comment-16989161 ] Guozhang Wang commented on KAFKA-9131: -- [~gkomissarov] Thanks for reporting and fixing this! > failed producer metadata updates result in the unrelated error message > -- > > Key: KAFKA-9131 > URL: https://issues.apache.org/jira/browse/KAFKA-9131 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Gleb Komissarov >Assignee: Gleb Komissarov >Priority: Major > Fix For: 2.5.0 > > > {{Producer Metadata TimeoutException}} is processed as a generic > RetriableException in RecordCollectorImpl.sendError. This results in an > irrelevant error message. > We were supposed to see this > "Timeout exception caught when sending record to topic %s. " + > "This might happen if the producer cannot send data to the Kafka cluster and > thus, " + > "its internal buffer fills up. " + > "This can also happen if the broker is slow to respond, if the network > connection to " + > "the broker was interrupted, or if similar circumstances arise. " + > "You can increase producer parameter `max.block.ms` to increase this > timeout." > but got this: > "You can increase the producer configs `delivery.timeout.ms` and/or " + > "`retries` to avoid this error. Note that `retries` is set to infinite by > default." > These params are not applicable to metadata updates. > Technical details: > (1) Lines 221 - 236 in > kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java > are dead code. They are never executed because {{producer.send}} never throws > TimeoutException, but returns a failed future. You can see it in lines > 948-955 in > kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > (2) The exception is then processed in a callback function in the method > {{recordSendError}} on line 202. The DefaultProductionExceptionHandler is > used. > (3) in {{recordSendError}} in the same class the timeout exception is > processed as RetriableException at lines 133-136. The error message is simply > wrong because tweaking {{[delivery.timeout.ms|http://delivery.timeout.ms/]}} > and {{retries}} has nothing to do with the issue in this case. > Proposed solution: > (1) Remove unreachable catch (final TimeoutException e) in > RecordCollectorImpl.java as Producer does not throw ApiExceptions. > (2) Move the aforementioned catch clause to recordSendError method. > (3) Process TimeoutException separately from RetiriableException. > (4) Implement a unit test to cover this corner case > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams
Boyang Chen created KAFKA-9274: -- Summary: Gracefully handle timeout exceptions on Kafka Streams Key: KAFKA-9274 URL: https://issues.apache.org/jira/browse/KAFKA-9274 Project: Kafka Issue Type: Improvement Components: streams Reporter: Boyang Chen Right now streams don't treat timeout exception as retriable in general by throwing it to the application level. If not handled by the user, this would kill the stream thread unfortunately. In fact, timeouts happen mostly due to network issue or server side unavailability. Hard failure on client seems to be an over-kill. We would like to discuss what's the best practice to handle timeout exceptions on Streams. The current state is still brainstorming and consolidate all the cases that contain timeout exception within this ticket. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64
[ https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989187#comment-16989187 ] Guozhang Wang commented on KAFKA-9225: -- [~vvcephei] Can we just require users to make code changes in their rocksDB options APIs in 3.0 while doing the upgrade in place? > kafka fail to run on linux-aarch64 > -- > > Key: KAFKA-9225 > URL: https://issues.apache.org/jira/browse/KAFKA-9225 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: jiamei xie >Priority: Blocker > Labels: incompatible > Fix For: 3.0.0 > > Attachments: compat_report.html > > > *Steps to reproduce:* > 1. Download Kafka latest source code > 2. Build it with gradle > 3. Run > [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]] > when running bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with > the following error message > {code:java} > xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied > but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was > supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:24,278] ERROR stream-client > [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads > have died. The instance will be in error state and should be closed. > (org.apach e.kafka.streams.KafkaStreams) > Exception in thread > "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" > java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: > /tmp/librocksdbjni13777546368576524 84.so: > cannot open shared object file: No such file or directory (Possible cause: > can't load AMD 64-bit .so on a AARCH64-bit platform) > {code} > *Analyze:* > This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 > native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from > [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix > this problem. > Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 > and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to > 6.3.6 in upstream? Should there be any kind of tests to execute, please > kindly point me. Thanks a lot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989190#comment-16989190 ] Bill Bejeck commented on KAFKA-9274: At the risk of stating the obvious,I think we should have a default number of `num.timeout.retries`. The `num.timeout.retries` would be a new configuration (KIP required) that gets applied across the board in a streams application for all cases where there could be a timeout connecting to the broker. > Gracefully handle timeout exceptions on Kafka Streams > - > > Key: KAFKA-9274 > URL: https://issues.apache.org/jira/browse/KAFKA-9274 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Priority: Major > > Right now streams don't treat timeout exception as retriable in general by > throwing it to the application level. If not handled by the user, this would > kill the stream thread unfortunately. > In fact, timeouts happen mostly due to network issue or server side > unavailability. Hard failure on client seems to be an over-kill. > We would like to discuss what's the best practice to handle timeout > exceptions on Streams. The current state is still brainstorming and > consolidate all the cases that contain timeout exception within this ticket. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6989) Support Async Processing in Streams
[ https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6989: - Labels: api needs-kip (was: needs-kip) > Support Async Processing in Streams > --- > > Key: KAFKA-6989 > URL: https://issues.apache.org/jira/browse/KAFKA-6989 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: api, needs-kip > > KIP WIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-311%3A+Async+processing+with+dynamic+scheduling+in+Kafka+Streams] > Today Kafka Streams use a single-thread per task architecture to achieve > embarrassing parallelism and good isolation. However there are a couple > scenarios where async processing may be preferable: > 1) External resource access or heavy IOs with high-latency. Suppose you need > to access a remote REST api, read / write to an external store, or do a heavy > disk IO operation that may result in high latency. Current threading model > would block any other records before this record's done, waiting on the > remote call / IO to finish. > 2) Robust failure handling with retries. Imagine the app-level processing of > a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an > external system, and this call failed), and failed records are moved into a > separate "retry" topic. How can you process such failed records in a scalable > way? For example, imagine you need to implement a retry policy such as "retry > with exponential backoff". Here, you have the problem that 1. you can't > really pause processing a single record because this will pause the > processing of the full stream (bottleneck!) and 2. there is no > straight-forward way to "sort" failed records based on their "next retry > time" (think: priority queue). > 3) Delayed processing. One use case is delaying re-processing (e.g. "delay > re-processing this event for 5 minutes") as mentioned in 2), another is for > implementing a scheduler: e.g. do some additional operations later based on > this processed record. based on Zalando Dublin, for example, are implementing > a distributed web crawler. Note that although this feature can be handled in > punctuation, it is not well aligned with our current offset committing > behavior, which always advance the offset once the record has been done > traversing the topology. > I'm thinking of two options to support this feature: > 1. Make the commit() mechanism more customizable to users for them to > implement multi-threading processing themselves: users can always do async > processing in the Processor API by spawning a thread-poll, e.g. but the key > is that the offset to be committed should be only advanced with such async > processing is done. This is a light-weight approach: we provide all the > pieces and tools, and users stack them up to build their own LEGOs. > 2. Provide an general API to do async processing in Processor API, and take > care of the offsets committing internally. This is a heavy-weight approach: > the API may not cover all async scenarios, but it is a easy way to cover the > rest majority scenarios, and users do not need to worry of internal > implementation details such as offsets and fault tolerance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-9073: - Fix Version/s: 2.3.2 > Kafka Streams State stuck in rebalancing after one of the StreamThread > encounters java.lang.IllegalStateException: No current assignment for > partition > -- > > Key: KAFKA-9073 > URL: https://issues.apache.org/jira/browse/KAFKA-9073 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: amuthan Ganeshan >Priority: Major > Fix For: 2.4.0, 2.3.2 > > Attachments: KAFKA-9073.log > > > I have a Kafka stream application that stores the incoming messages into a > state store, and later during the punctuation period, we store them into a > big data persistent store after processing the messages. > The application consumes from 120 partitions distributed across 40 instances. > The application has been running fine without any problem for months, but all > of a sudden some of the instances failed because of a stream thread exception > saying > ```java.lang.IllegalStateException: No current assignment for partition > --changelog-98``` > > And other instances are stuck in the REBALANCING state, and never comes out > of it. Here is the full stack trace, I just masked the application-specific > app name and store name in the stack trace due to NDA. > > ``` > 2019-10-21 13:27:13,481 ERROR > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread > [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] > Encountered the following error during processing: > java.lang.IllegalStateException: No current assignment for partition > application.id-store_name-changelog-98 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618) > at > org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > ``` > > Now I checked the state sore disk usage; it is less than 40% of the total > disk space available. Restarting the application solves the problem for a > short amount of time, but the error popping up randomly on some other > instances quickly. I tried to change the retry and retry.backoff.ms > configuration but not helpful at all > ``` > retries = 2147483647 > retry.backoff.ms > ``` > After googling for some time I found there was a similar bug reported to the > Kafka team in the past, and also notice my stack trace is exactly matching > with the stack trace of the reported bug. > Here is the link for the bug reported on a comparable basis a year ago. > https://issues.apache.org/jira/browse/KAFKA-7181 > > Now I am wondering
[jira] [Commented] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989195#comment-16989195 ] John Roesler commented on KAFKA-9274: - Thanks for starting this, [~bchen225242]! One thing we should document is that the current approach is actually intentional. Streams essentially delegates the handling of transient failures into the clients (Consumer and Producer and Admin). Accordingly, the current advice is to set the client timeouts high enough to tolerate transient outages. However, I agree with you that this is not the best approach. Setting the timeout high enough to be resilient to most network outages has the perverse effect of allowing StreamThreads to get hung up on an individual operation for that amount of time, harming the overall ability of the system to make progress on all inputs. For example, what if only one broker is unhealthy/unavailable? Rather than sit around indefinitely waiting to commit on the partition that broker is leader for, we could attempt to make progress on other tasks, and then try to commit the stuck one again later. Another issue is self-healing. Suppose that the broker cluster becomes unavailable. If we're running an application in which some or all of the threads will die from timeouts, then whenever the broker's operators _do_ bring it back up, we would have to actually restart the application to recover. Maybe this doesn't seem so bad, but it you're in a large organization operating 100 Streams apps, it sounds like a pretty big pain to me. Conversely, if Streams were to just log a warning each time it got a timeout, but continue trying to make progress, then we would know that there is something wrong (because we're monitoring the app for warnings), so we could alert the broker's operators. However, once the issue is resolved, our app will just automatically pick right back up where it left off. At a very high level, I'd propose the following failure handling protocol: 1. *retriable error*: A transient failure (like a timeout exception). We just put the task to the side, and try to do work for the next task. 2. *recoverable error*: A permanent failure (like getting fenced), than we can recover from by re-joining the cluster. We try to close and re-open the producer, or initiate a rebalance to re-join the cluster, depending on the exact nature of the failure. 3. *fatal error*: A permanent failure that requires human intervention to resolve. Something like discovering that we're in the middle of upgrading to an incompatible topology, or that the application itself is invalid for some other reason. Attempting to continue could result in data corruption, so we should just shut down the whole application so that someone can figure out what's wrong and fix it. > Gracefully handle timeout exceptions on Kafka Streams > - > > Key: KAFKA-9274 > URL: https://issues.apache.org/jira/browse/KAFKA-9274 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Priority: Major > > Right now streams don't treat timeout exception as retriable in general by > throwing it to the application level. If not handled by the user, this would > kill the stream thread unfortunately. > In fact, timeouts happen mostly due to network issue or server side > unavailability. Hard failure on client seems to be an over-kill. > We would like to discuss what's the best practice to handle timeout > exceptions on Streams. The current state is still brainstorming and > consolidate all the cases that contain timeout exception within this ticket. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989195#comment-16989195 ] John Roesler edited comment on KAFKA-9274 at 12/5/19 9:47 PM: -- Thanks for starting this, [~bchen225242]! One thing we should document is that the current approach is actually intentional. Streams essentially delegates the handling of transient failures into the clients (Consumer and Producer and Admin). Accordingly, the current advice is to set the client timeouts high enough to tolerate transient outages. However, I agree with you that this is not the best approach. Setting the timeout high enough to be resilient to most network outages has the perverse effect of allowing StreamThreads to get hung up on an individual operation for that amount of time, harming the overall ability of the system to make progress on all inputs. For example, what if only one broker is unhealthy/unavailable? Rather than sit around indefinitely waiting to commit on the partition that broker is leader for, we could attempt to make progress on other tasks, and then try to commit the stuck one again later. Another issue is self-healing. Suppose that the broker cluster becomes unavailable. If we're running an application in which some or all of the threads will die from timeouts, then whenever the broker's operators _do_ bring it back up, we would have to actually restart the application to recover. Maybe this doesn't seem so bad, but it you're in a large organization operating 100 Streams apps, it sounds like a pretty big pain to me. Conversely, if Streams were to just log a warning each time it got a timeout, but continue trying to make progress, then we would know that there is something wrong (because we're monitoring the app for warnings), so we could alert the broker's operators. However, once the issue is resolved, our app will just automatically pick right back up where it left off. At a very high level, I'd propose the following failure handling protocol: 1. *retriable error*: A transient failure (like a timeout exception). We just put the task to the side, and try to do work for the next task. When we come back around to the current task, we should try to repeat the operation. I.e., if it was a timeout on commit, we should try to commit again when we come back around, as opposed to just continuing to process that task without committing. 2. *recoverable error*: A permanent failure (like getting fenced), than we can recover from by re-joining the cluster. We try to close and re-open the producer, or initiate a rebalance to re-join the cluster, depending on the exact nature of the failure. 3. *fatal error*: A permanent failure that requires human intervention to resolve. Something like discovering that we're in the middle of upgrading to an incompatible topology, or that the application itself is invalid for some other reason. Attempting to continue could result in data corruption, so we should just shut down the whole application so that someone can figure out what's wrong and fix it. was (Author: vvcephei): Thanks for starting this, [~bchen225242]! One thing we should document is that the current approach is actually intentional. Streams essentially delegates the handling of transient failures into the clients (Consumer and Producer and Admin). Accordingly, the current advice is to set the client timeouts high enough to tolerate transient outages. However, I agree with you that this is not the best approach. Setting the timeout high enough to be resilient to most network outages has the perverse effect of allowing StreamThreads to get hung up on an individual operation for that amount of time, harming the overall ability of the system to make progress on all inputs. For example, what if only one broker is unhealthy/unavailable? Rather than sit around indefinitely waiting to commit on the partition that broker is leader for, we could attempt to make progress on other tasks, and then try to commit the stuck one again later. Another issue is self-healing. Suppose that the broker cluster becomes unavailable. If we're running an application in which some or all of the threads will die from timeouts, then whenever the broker's operators _do_ bring it back up, we would have to actually restart the application to recover. Maybe this doesn't seem so bad, but it you're in a large organization operating 100 Streams apps, it sounds like a pretty big pain to me. Conversely, if Streams were to just log a warning each time it got a timeout, but continue trying to make progress, then we would know that there is something wrong (because we're monitoring the app for warnings), so we could alert the broker's operators. However, once the issue is resolved, our app will just automatically pick right back up where it left off. At a very high level,
[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64
[ https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989201#comment-16989201 ] John Roesler commented on KAFKA-9225: - Hey [~guozhang], Yes, I think it would be ok to plan to upgrade to RocksDB 3.x as part of the AK 3.0 release. In 2.4, we are adding a log message warning anyone who uses the RocksDB APIs in question that this change will happen in the next major release. (We're not in a position to add deprecation annotations, since it's directly exposed RocksDB classes. Since we don't know when 3.0 would be, exactly, I was proposing a way that we could support both versions of RocksDB as soon as the very next release. But it's a cost/benefit question as to whether we should do it. > kafka fail to run on linux-aarch64 > -- > > Key: KAFKA-9225 > URL: https://issues.apache.org/jira/browse/KAFKA-9225 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: jiamei xie >Priority: Blocker > Labels: incompatible > Fix For: 3.0.0 > > Attachments: compat_report.html > > > *Steps to reproduce:* > 1. Download Kafka latest source code > 2. Build it with gradle > 3. Run > [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]] > when running bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with > the following error message > {code:java} > xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied > but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was > supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:24,278] ERROR stream-client > [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads > have died. The instance will be in error state and should be closed. > (org.apach e.kafka.streams.KafkaStreams) > Exception in thread > "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" > java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: > /tmp/librocksdbjni13777546368576524 84.so: > cannot open shared object file: No such file or directory (Possible cause: > can't load AMD 64-bit .so on a AARCH64-bit platform) > {code} > *Analyze:* > This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 > native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from > [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix > this problem. > Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 > and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to > 6.3.6 in upstream? Should there
[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64
[ https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989206#comment-16989206 ] Sophie Blee-Goldman commented on KAFKA-9225: At that point we might as well just [fork RocksDB |https://issues.apache.org/jira/browse/KAFKA-9148] ;) > kafka fail to run on linux-aarch64 > -- > > Key: KAFKA-9225 > URL: https://issues.apache.org/jira/browse/KAFKA-9225 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: jiamei xie >Priority: Blocker > Labels: incompatible > Fix For: 3.0.0 > > Attachments: compat_report.html > > > *Steps to reproduce:* > 1. Download Kafka latest source code > 2. Build it with gradle > 3. Run > [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]] > when running bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with > the following error message > {code:java} > xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied > but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was > supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:24,278] ERROR stream-client > [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads > have died. The instance will be in error state and should be closed. > (org.apach e.kafka.streams.KafkaStreams) > Exception in thread > "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" > java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: > /tmp/librocksdbjni13777546368576524 84.so: > cannot open shared object file: No such file or directory (Possible cause: > can't load AMD 64-bit .so on a AARCH64-bit platform) > {code} > *Analyze:* > This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 > native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from > [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix > this problem. > Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 > and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to > 6.3.6 in upstream? Should there be any kind of tests to execute, please > kindly point me. Thanks a lot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989207#comment-16989207 ] Guozhang Wang commented on KAFKA-9013: -- Happens again: https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/3756/consoleFull Different error message: {code} 12:47:24 org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > testReplication FAILED 12:47:24 java.lang.RuntimeException: Could not find enough records. found 0, expected 1 12:47:24 at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:334) 12:47:24 at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:234) {code} > Flaky Test MirrorConnectorsIntegrationTest#testReplication > -- > > Key: KAFKA-9013 > URL: https://issues.apache.org/jira/browse/KAFKA-9013 > Project: Kafka > Issue Type: Bug >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > > h1. Stacktrace: > {code:java} > java.lang.AssertionError: Condition not met within timeout 2. Offsets not > translated downstream to primary cluster. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239) > {code} > h1. Standard Error > {code} > Standard Error > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.RootResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. > Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors > WARNING: The following warnings have been detected: WARNING: The > (sub)resource method listLoggers in > org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectors in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method createConnector in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectorPlugins in > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > contains empty path annotation. > WARNING: The (sub)resource method serverInfo in > org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty > path annotation. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime
[jira] [Commented] (KAFKA-9260) Improve Serde "push down" and "wrapping"
[ https://issues.apache.org/jira/browse/KAFKA-9260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989216#comment-16989216 ] Guozhang Wang commented on KAFKA-9260: -- One cause of the current design is that we used to build the topology first, then pass in the config which contains the default serdes, hence it is done in two consecutive step. However now with optimization we already have a StreamsBuilder.build(Properties) override, and in practice it would not be too aggressive to "always" require the configs be ready when building the topology anyways. So on top of my head I think: 1) removing the other `StreamsBuilder.build()` without params to always enforce configs being passed in. 2) removing the `KafkaStreams` construct that takes topology and config, but only the topology since it should contain the configs already. Then in the topology building phase, we can have a uniform framework as "if there's a serde inheritable, use it; otherwise use the default serde from config directly and apply any wrapping logic if necessary". WDYT? > Improve Serde "push down" and "wrapping" > > > Key: KAFKA-9260 > URL: https://issues.apache.org/jira/browse/KAFKA-9260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > Kafka Streams DSL supports "serde push down" feature to let downstream > operators inherit upstream serdes if key and/or value are not modified. > Furthermore, some operators "wrap" user specified serdes internally (eg, > windowed aggregations wrap the user specified key-serde with a time/session > window serde – some stores use `ValueAndTimestampSerde` and foreign-key joins > also uses some internal wrappers). > The current implementation faces couple of issues, because the "serde push > down" feature is a DSL level feature that is used when the Topology is > generated. Furthermore, "serde wrapping" is an operator specific feature, not > a DSL concept per-se. At runtime, neither "push down" nor "wrapping" are know > concepts. > This design implies that if users specify serdes, wrapping and push down > works as expected. However, if we fall back to default serdes, special care > needs to be taken: for example, some operators not apply the wrapping logic > during translation time, and there is additional code that does the wrapping > of default serdes as runtime. Another approach would be to wrap a null-Serde, > and update the wrapper later (ie, overwrite `null` with the default serde > from the config). > Overall, the current design leads to bugs (eg, KAFKA-9248 and KAFKA-9259), > and user confusion how it actually works and when/where to specify serdes. > Hence, we should consider to rework how we do serde push down and/or wrapping. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64
[ https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989225#comment-16989225 ] John Roesler commented on KAFKA-9225: - I see what you did there :) Forking RocksDB is also a viable option to deal with problems like the above, but in seriousness, I don't see offering different "plug in" modules with different state store implementations as "just about the same loe" as forking Rocks. They may actually both be useful approaches, though. > kafka fail to run on linux-aarch64 > -- > > Key: KAFKA-9225 > URL: https://issues.apache.org/jira/browse/KAFKA-9225 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: jiamei xie >Priority: Blocker > Labels: incompatible > Fix For: 3.0.0 > > Attachments: compat_report.html > > > *Steps to reproduce:* > 1. Download Kafka latest source code > 2. Build it with gradle > 3. Run > [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]] > when running bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with > the following error message > {code:java} > xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh > org.apache.kafka.streams.examples.wordcount.WordCountDemo > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied > but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was > supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) > [2019-11-19 15:42:24,278] ERROR stream-client > [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads > have died. The instance will be in error state and should be closed. > (org.apach e.kafka.streams.KafkaStreams) > Exception in thread > "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" > java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: > /tmp/librocksdbjni13777546368576524 84.so: > cannot open shared object file: No such file or directory (Possible cause: > can't load AMD 64-bit .so on a AARCH64-bit platform) > {code} > *Analyze:* > This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 > native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from > [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix > this problem. > Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 > and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to > 6.3.6 in upstream? Should there be any kind of tests to execute, please > kindly point me. Thanks a lot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9148) Consider forking RocksDB for Streams
[ https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9148: --- Description: We recently upgraded our RocksDB dependency to 5.18 for its memory-management abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, someone from Flink recently discovered a ~8% [performance regression|https://github.com/facebook/rocksdb/issues/5774] that exists in all versions 5.18+ (up through the current newest version, 6.2.2). Flink was able to react to this by downgrading to 5.17 and [picking the WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their fork (fRocksDB). Due to this and other reasons enumerated below, we should consider also forking our own RocksDB for Streams. Pros: * We can avoid passing sudden breaking changes on to our users, such removal of methods with no deprecation period (see discussion on KAFKA-8897) * We can pick whichever version has the best performance for our needs, and pick over any new features, metrics, etc that we need to use rather than being forced to upgrade (and breaking user code, introducing regression, etc) * Support for some architectures does not exist in all versions, making RocksDB – Streams RocksDB versions are architecutres * The Java API seems to be a very low priority to the rocksdb folks. ** They leave out critical functionality, features, and configuration options that have been in the c++ API for a very long time ** Those that do make it over often have random gaps in the API such as setters but no getters (see [rocksdb PR #5186|https://github.com/facebook/rocksdb/pull/5186]) ** Others are poorly designed and require too many trips across the JNI, making otherwise incredibly useful features prohibitively expensive. *** [Custom comparator|#issuecomment-83145980]]: a custom comparator could significantly improve the performance of session windows. This is trivial to do but given the high performance cost of crossing the jni, it is currently only practical to use a c++ comparator *** [Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not currently used by Streams but a commonly requested feature, and may also allow improved range queries ** Even when an external contributor develops a solution for poorly performing Java functionality and helpfully tries to contribute their patch back to rocksdb, it gets ignored by the rocksdb people ([rocksdb PR #2283|https://github.com/facebook/rocksdb/pull/2283]) Cons: * More work (not to be trivialized, the truth is we don't and can't know how much extra work this will ultimately be) Given that we rarely upgrade the Rocks dependency, use only some fraction of its features, and would need or want to make only minimal changes ourselves, it seems like we could actually get away with very little extra work by forking rocksdb. Note that as of this writing the frocksdb repo has only needed to open 5 PRs on top of the actual rocksdb (two of them trivial). Of course, the LOE to maintain this will only grow over time, so we should think carefully about whether and when to start taking on this potential burden. was: We recently upgraded our RocksDB dependency to 5.18 for its memory-management abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, someone from Flink recently discovered a ~8% [performance regression|https://github.com/facebook/rocksdb/issues/5774] that exists in all versions 5.18+ (up through the current newest version, 6.2.2). Flink was able to react to this by downgrading to 5.17 and [picking the WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their fork (fRocksDB). Due to this and other reasons enumerated below, we should consider also forking our own RocksDB for Streams. Pros: * We can avoid passing sudden breaking changes on to our users, such removal of methods with no deprecation period (see discussion on KAFKA-8897) * We can pick whichever version has the best performance for our needs, and pick over any new features, metrics, etc that we need to use rather than being forced to upgrade (and breaking user code, introducing regression, etc) * The Java API seems to be a very low priority to the rocksdb folks. ** They leave out critical functionality, features, and configuration options that have been in the c++ API for a very long time ** Those that do make it over often have random gaps in the API such as setters but no getters (see [rocksdb PR #5186|https://github.com/facebook/rocksdb/pull/5186]) ** Others are poorly designed and require too many trips across the JNI, making otherwise incredibly useful features prohibitively expensive. *** [Custom comparator|#issuecomment-83145980]]: a custom comparator could significantly improve the performance of session windows. This is trivial to do but given the high perform
[jira] [Updated] (KAFKA-9148) Consider forking RocksDB for Streams
[ https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9148: --- Description: We recently upgraded our RocksDB dependency to 5.18 for its memory-management abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, someone from Flink recently discovered a ~8% [performance regression|https://github.com/facebook/rocksdb/issues/5774] that exists in all versions 5.18+ (up through the current newest version, 6.2.2). Flink was able to react to this by downgrading to 5.17 and [picking the WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their fork (fRocksDB). Due to this and other reasons enumerated below, we should consider also forking our own RocksDB for Streams. Pros: * We can avoid passing sudden breaking changes on to our users, such removal of methods with no deprecation period (see discussion on KAFKA-8897) * We can pick whichever version has the best performance for our needs, and pick over any new features, metrics, etc that we need to use rather than being forced to upgrade (and breaking user code, introducing regression, etc) * Support for some architectures does not exist in all RocksDB versions, making Streams completely unusable for some users until we can upgrade the rocksdb dependency to one that supports their specific case * The Java API seems to be a very low priority to the rocksdb folks. ** They leave out critical functionality, features, and configuration options that have been in the c++ API for a very long time ** Those that do make it over often have random gaps in the API such as setters but no getters (see [rocksdb PR #5186|https://github.com/facebook/rocksdb/pull/5186]) ** Others are poorly designed and require too many trips across the JNI, making otherwise incredibly useful features prohibitively expensive. *** [Custom comparator|#issuecomment-83145980]]: a custom comparator could significantly improve the performance of session windows. This is trivial to do but given the high performance cost of crossing the jni, it is currently only practical to use a c++ comparator *** [Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not currently used by Streams but a commonly requested feature, and may also allow improved range queries ** Even when an external contributor develops a solution for poorly performing Java functionality and helpfully tries to contribute their patch back to rocksdb, it gets ignored by the rocksdb people ([rocksdb PR #2283|https://github.com/facebook/rocksdb/pull/2283]) Cons: * More work (not to be trivialized, the truth is we don't and can't know how much extra work this will ultimately be) Given that we rarely upgrade the Rocks dependency, use only some fraction of its features, and would need or want to make only minimal changes ourselves, it seems like we could actually get away with very little extra work by forking rocksdb. Note that as of this writing the frocksdb repo has only needed to open 5 PRs on top of the actual rocksdb (two of them trivial). Of course, the LOE to maintain this will only grow over time, so we should think carefully about whether and when to start taking on this potential burden. was: We recently upgraded our RocksDB dependency to 5.18 for its memory-management abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, someone from Flink recently discovered a ~8% [performance regression|https://github.com/facebook/rocksdb/issues/5774] that exists in all versions 5.18+ (up through the current newest version, 6.2.2). Flink was able to react to this by downgrading to 5.17 and [picking the WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their fork (fRocksDB). Due to this and other reasons enumerated below, we should consider also forking our own RocksDB for Streams. Pros: * We can avoid passing sudden breaking changes on to our users, such removal of methods with no deprecation period (see discussion on KAFKA-8897) * We can pick whichever version has the best performance for our needs, and pick over any new features, metrics, etc that we need to use rather than being forced to upgrade (and breaking user code, introducing regression, etc) * Support for some architectures does not exist in all versions, making RocksDB – Streams RocksDB versions are architecutres * The Java API seems to be a very low priority to the rocksdb folks. ** They leave out critical functionality, features, and configuration options that have been in the c++ API for a very long time ** Those that do make it over often have random gaps in the API such as setters but no getters (see [rocksdb PR #5186|https://github.com/facebook/rocksdb/pull/5186]) ** Others are poorly designed and require too many trips across the JNI, making otherwise incredibly useful feature
[jira] [Created] (KAFKA-9275) Print assignment and IP address in the log message when consumer leaves/removed from the group
Badai Aqrandista created KAFKA-9275: --- Summary: Print assignment and IP address in the log message when consumer leaves/removed from the group Key: KAFKA-9275 URL: https://issues.apache.org/jira/browse/KAFKA-9275 Project: Kafka Issue Type: Improvement Reporter: Badai Aqrandista In order to simplify identification of which member is causing rebalance, can we add the IP address and the list of topic-partitions assigned to the member that leaves/removed from the group? And especially in the "reason" string for rebalance: https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964 https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972 https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144 https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493 This will allow a much faster investigation when a consumer group is stuck in rebalancing state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9276) Unclear warning due to empty throttled fetch response
Bob Barrett created KAFKA-9276: -- Summary: Unclear warning due to empty throttled fetch response Key: KAFKA-9276 URL: https://issues.apache.org/jira/browse/KAFKA-9276 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.3.0 Reporter: Bob Barrett With a version 1.0.0 consumer talking to a 2.3.0 broker, the following WARN-level statement is logged when a request is throttled: {code:java} Ignoring fetch response containing partitions [] since it does not match the requested partitions [topic-29, topic-25, topic-21, topic-17, topic-33]{code} It appears that the 1.0.0 consumer expects fetch data for all partitions, but the throttled response is empty, causing an unclear warning message. This may be a regression from KIP-219, which changed the throttling behavior to return the empty fetch response. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9275) Print assignment and IP address in the log message when consumer leaves/removed from the group
[ https://issues.apache.org/jira/browse/KAFKA-9275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Badai Aqrandista updated KAFKA-9275: Description: In order to simplify identification of which member is causing rebalance, can we add the IP address and the list of topic-partitions assigned to the member that leaves/removed from the group? And especially in the "reason" string for rebalance: https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964 https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972 https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144 https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493 This will allow a much faster investigation when a consumer group is stuck in rebalancing state. Even better if we can send this reason to all other members that are affected by the rebalance. Currently, it takes forever to know what caused a rebalance when a consumer group continuously goes into rebalancing state due to one member keeps exceeding max.poll.intervals.ms. was: In order to simplify identification of which member is causing rebalance, can we add the IP address and the list of topic-partitions assigned to the member that leaves/removed from the group? And especially in the "reason" string for rebalance: https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964 https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972 https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144 https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493 This will allow a much faster investigation when a consumer group is stuck in rebalancing state. > Print assignment and IP address in the log message when consumer > leaves/removed from the group > -- > > Key: KAFKA-9275 > URL: https://issues.apache.org/jira/browse/KAFKA-9275 > Project: Kafka > Issue Type: Improvement >Reporter: Badai Aqrandista >Priority: Minor > > In order to simplify identification of which member is causing rebalance, can > we add the IP address and the list of topic-partitions assigned to the member > that leaves/removed from the group? > And especially in the "reason" string for rebalance: > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964 > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972 > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144 > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493 > This will allow a much faster investigation when a consumer group is stuck in > rebalancing state. > Even better if we can send this reason to all other members that are affected > by the rebalance. Currently, it takes forever to know what caused a rebalance > when a consumer group continuously goes into rebalancing state due to one > member keeps exceeding max.poll.intervals.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9212) Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest
[ https://issues.apache.org/jira/browse/KAFKA-9212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989287#comment-16989287 ] Michael Jaschob commented on KAFKA-9212: Chiming in here, I believe we've experienced the same error. I've been able to reproduce the behavior quite simply, as follows: - 3-broker cluster (running Apache Kafka 2.3.1) - one partition with replica assignment (0, 1, 2) - booted fourth broker (id 3) - initiated partition reassignment from (0, 1, 2) to (0, 1, 2, 3) with a very low throttle (for testing) As soon as the assignment begins, a 2.3.0 console consumer simply hangs when started. A 1.1.1 consumer does not have any issues. I see this in leader broker's request logs: {code:java} [2019-12-05 16:38:36,790] DEBUG Completed request:RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=consumer-1, correlationId=1529) -- {replica_id=-1,isolation_level=0,topics=[{topic=DataPlatform.CGSynthTests,partitions=[{partition=0,current_leader_epoch=0,timestamp=-1}]}]},response:{throttle_time_ms=0,responses=[{topic=DataPlatform.CGSynthTests,partition_responses=[{partition=0,error_code=74,timestamp=-1,offset=-1,leader_epoch=-1}]}]} from connection 172.22.15.67:9092-172.22.23.98:46974-9;totalTime:0.27,requestQueueTime:0.044,localTime:0.185,remoteTime:0.0,throttleTime:0.036,responseQueueTime:0.022,sendTime:0.025,securityProtocol:PLAINTEXT,principal:User:data-pipeline-monitor,listener:PLAINTEXT (kafka.request.logger) {code} Note the producer fenced error code for list offsets, as in the original report. Once the reassignment completes, the 2.3.1 console consumer starts working. I've also tried a different reassignment (0, 1, 2) -> (3, 1, 2) with the same results. Where we stand right now is we can't initiate partition reassignments in our production cluster without paralyzing a Spark application (using 2.3.0 client libs under the hood). Downgrading the Kafka client libs there isn't possible since they are part of the Spark assembly. Any pointers on what the issue might be here? Struggling to understand the bug because it seems like any partition reassignment breaks LIST_OFFSETS requests from 2.3 clients, but that just seems to be too severe a problem to have gone unnoticed for so long. Even ideas for a workaround would help here, since we don't see a path to do partition reassignments without causing a production incident right now. > Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest > -- > > Key: KAFKA-9212 > URL: https://issues.apache.org/jira/browse/KAFKA-9212 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager >Affects Versions: 2.3.0 > Environment: Linux >Reporter: Yannick >Priority: Critical > > When running Kafka connect s3 sink connector ( confluent 5.3.0), after one > broker got restarted (leaderEpoch updated at this point), the connect worker > crashed with the following error : > [2019-11-19 16:20:30,097] ERROR [Worker clientId=connect-1, > groupId=connect-ls] Uncaught exception in herder work thread, exiting: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253) > org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by > times in 30003ms > > After investigation, it seems it's because it got fenced when sending > ListOffsetRequest in loop and then got timed out , as follows : > [2019-11-19 16:20:30,020] DEBUG [Consumer clientId=consumer-3, > groupId=connect-ls] Sending ListOffsetRequest (type=ListOffsetRequest, > replicaId=-1, partitionTimestamps={connect_ls_config-0={timestamp: -1, > maxNumOffsets: 1, currentLeaderEpoch: Optional[1]}}, > isolationLevel=READ_UNCOMMITTED) to broker kafka6.fra2.internal:9092 (id: 4 > rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:905) > [2019-11-19 16:20:30,044] DEBUG [Consumer clientId=consumer-3, > groupId=connect-ls] Attempt to fetch offsets for partition > connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. > (org.apache.kafka.clients.consumer.internals.Fetcher:985) > > The above happens multiple times until timeout. > > According to the debugs, the consumer always get a leaderEpoch of 1 for this > topic when starting up : > > [2019-11-19 13:27:30,802] DEBUG [Consumer clientId=consumer-3, > groupId=connect-ls] Updating last seen epoch from null to 1 for partition > connect_ls_config-0 (org.apache.kafka.clients.Metadata:178) > > > But according to our brokers log, the leaderEpoch should be 2, as follows : > > [2019-11-18 14:19:28,988] INFO [Partition connect_ls_config-0 broker=4] > connect_ls_config-0 starts at Leader Epoch 2 from offset 22. Previous Leader > Epoch was: 1 (kafka.cluster.Partition) > > > This make impossible t
[jira] [Updated] (KAFKA-9211) kafka upgrade 2.3.0 cause produce speed decrease
[ https://issues.apache.org/jira/browse/KAFKA-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] li xiangyuan updated KAFKA-9211: Attachment: producer.node.latency.png > kafka upgrade 2.3.0 cause produce speed decrease > > > Key: KAFKA-9211 > URL: https://issues.apache.org/jira/browse/KAFKA-9211 > Project: Kafka > Issue Type: Bug > Components: controller, producer >Affects Versions: 2.3.0 >Reporter: li xiangyuan >Priority: Critical > Attachments: broker-jstack.txt, producer-jstack.txt, > producer.node.latency.png > > > Recently we try upgrade kafka from 0.10.0.1 to 2.3.0. > we have 15 clusters in production env, each one has 3~6 brokers. > we know kafka upgrade should: > 1.replcae code to 2.3.0.jar and restart all brokers one by one > 2.unset inter.broker.protocol.version=0.10.0.1 and restart all brokers > one by one > 3.unset log.message.format.version=0.10.0.1 and restart all brokers one > by one > > for now we have already done step 1 & 2 in 12 clusters.but when we try to > upgrade left clusters (already done step 1) in step 2, we found some topics > drop produce speed badly. > we have research this issue for long time, since we couldn't test it in > production environment and we couldn't reproduce in test environment, we > couldn't find the root cause. > now we only could describe the situation in detail as i know, hope anyone > could help us. > > 1.because bug KAFKA-8653, i add code below in KafkaApis.scala > handleJoinGroupRequest function: > {code:java} > if (rebalanceTimeoutMs <= 0) { > rebalanceTimeoutMs = joinGroupRequest.data.sessionTimeoutMs > }{code} > 2.one cluster upgrade failed has 6 8C16G brokers, about 200 topics with 2 > replicas,every broker keep 3000+ partitions and 1500+ leader partition, but > most of them has very low produce message speed,about less than > 50messages/sec, only one topic with 300 partitions has more than 2500 > message/sec with more than 20 consumer groups consume message from it. > so this whole cluster produce 4K messages/sec , 11m Bytes in /sec,240m Bytes > out /sec.and more than 90% traffic made by that topic has 2500messages/sec. > when we unset 5 or 6 servers' inter.broker.protocol.version=0.10.0.1 and > restart, this topic produce message drop to about 200messages/sec, i don't > know whether the way we use could tirgger any problem. > 3.we use kafka wrapped by spring-kafka and set kafkatemplate's > autoFlush=true, so each producer.send execution will execute producer.flush > immediately too.i know flush method will decrease produce performance > dramaticlly, but at least it seems nothing wrong before upgrade step 2. but > i doubt whether it's a problem now after upgrade. > 4.I noticed when produce speed decrease, some consumer group has large > message lag still consume message without any consume speed change or > decrease, so I guess only producerequest speed will drop down,but > fetchrequest not. > 5.we haven't set any throttle configuration, and all producers' acks=1(so > it's not broker replica fetch slow), and when this problem triggered, both > sever & producers cpu usage down, and servers' ioutil keep less than 30% ,so > it shuldn't be a hardware problem. > 6.this event triggered often(almost 100%) most brokers has done upgrade step > 2,then after a auto leader replica election executed, then we can observe > produce speed drop down,and we have to downgrade brokers(set > inter.broker.protocol.version=0.10.0.1)and restart brokers one by one,then it > could be normal. some cluster have to downgrade all brokers,but some cluster > could left 1 or 2 brokers without downgrade, i notice that the broker not > need downgrade is the controller. > 7.I have print jstack for producer & servers. although I do this not the same > cluster, but we can notice that their thread seems really in idle stat. > 8.both 0.10.0.1 & 2.3.0 kafka-client will trigger this problem too. > 8.unless the largest one topic will drop produce speed certainly, other topic > will drop produce speed randomly. maybe topicA will drop speed in first > upgrade attempt but next not, and topicB not drop speed in first attemp but > dropped when do another attempt. > 9.in fact, the largest cluster, has the same topic & group usage scenario > mentioned above, but the largest topic has 1w2 messages/sec,will upgrade fail > in step 1(just use 2.3.0.jar) > any help would be grateful, thx, i'm very sad now... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9211) kafka upgrade 2.3.0 cause produce speed decrease
[ https://issues.apache.org/jira/browse/KAFKA-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] li xiangyuan updated KAFKA-9211: Attachment: nodelay.txt > kafka upgrade 2.3.0 cause produce speed decrease > > > Key: KAFKA-9211 > URL: https://issues.apache.org/jira/browse/KAFKA-9211 > Project: Kafka > Issue Type: Bug > Components: controller, producer >Affects Versions: 2.3.0 >Reporter: li xiangyuan >Priority: Critical > Attachments: ackdelay.txt, broker-jstack.txt, nodelay.txt, > producer-jstack.txt, producer.node.latency.png > > > Recently we try upgrade kafka from 0.10.0.1 to 2.3.0. > we have 15 clusters in production env, each one has 3~6 brokers. > we know kafka upgrade should: > 1.replcae code to 2.3.0.jar and restart all brokers one by one > 2.unset inter.broker.protocol.version=0.10.0.1 and restart all brokers > one by one > 3.unset log.message.format.version=0.10.0.1 and restart all brokers one > by one > > for now we have already done step 1 & 2 in 12 clusters.but when we try to > upgrade left clusters (already done step 1) in step 2, we found some topics > drop produce speed badly. > we have research this issue for long time, since we couldn't test it in > production environment and we couldn't reproduce in test environment, we > couldn't find the root cause. > now we only could describe the situation in detail as i know, hope anyone > could help us. > > 1.because bug KAFKA-8653, i add code below in KafkaApis.scala > handleJoinGroupRequest function: > {code:java} > if (rebalanceTimeoutMs <= 0) { > rebalanceTimeoutMs = joinGroupRequest.data.sessionTimeoutMs > }{code} > 2.one cluster upgrade failed has 6 8C16G brokers, about 200 topics with 2 > replicas,every broker keep 3000+ partitions and 1500+ leader partition, but > most of them has very low produce message speed,about less than > 50messages/sec, only one topic with 300 partitions has more than 2500 > message/sec with more than 20 consumer groups consume message from it. > so this whole cluster produce 4K messages/sec , 11m Bytes in /sec,240m Bytes > out /sec.and more than 90% traffic made by that topic has 2500messages/sec. > when we unset 5 or 6 servers' inter.broker.protocol.version=0.10.0.1 and > restart, this topic produce message drop to about 200messages/sec, i don't > know whether the way we use could tirgger any problem. > 3.we use kafka wrapped by spring-kafka and set kafkatemplate's > autoFlush=true, so each producer.send execution will execute producer.flush > immediately too.i know flush method will decrease produce performance > dramaticlly, but at least it seems nothing wrong before upgrade step 2. but > i doubt whether it's a problem now after upgrade. > 4.I noticed when produce speed decrease, some consumer group has large > message lag still consume message without any consume speed change or > decrease, so I guess only producerequest speed will drop down,but > fetchrequest not. > 5.we haven't set any throttle configuration, and all producers' acks=1(so > it's not broker replica fetch slow), and when this problem triggered, both > sever & producers cpu usage down, and servers' ioutil keep less than 30% ,so > it shuldn't be a hardware problem. > 6.this event triggered often(almost 100%) most brokers has done upgrade step > 2,then after a auto leader replica election executed, then we can observe > produce speed drop down,and we have to downgrade brokers(set > inter.broker.protocol.version=0.10.0.1)and restart brokers one by one,then it > could be normal. some cluster have to downgrade all brokers,but some cluster > could left 1 or 2 brokers without downgrade, i notice that the broker not > need downgrade is the controller. > 7.I have print jstack for producer & servers. although I do this not the same > cluster, but we can notice that their thread seems really in idle stat. > 8.both 0.10.0.1 & 2.3.0 kafka-client will trigger this problem too. > 8.unless the largest one topic will drop produce speed certainly, other topic > will drop produce speed randomly. maybe topicA will drop speed in first > upgrade attempt but next not, and topicB not drop speed in first attemp but > dropped when do another attempt. > 9.in fact, the largest cluster, has the same topic & group usage scenario > mentioned above, but the largest topic has 1w2 messages/sec,will upgrade fail > in step 1(just use 2.3.0.jar) > any help would be grateful, thx, i'm very sad now... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9211) kafka upgrade 2.3.0 cause produce speed decrease
[ https://issues.apache.org/jira/browse/KAFKA-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] li xiangyuan updated KAFKA-9211: Attachment: ackdelay.txt > kafka upgrade 2.3.0 cause produce speed decrease > > > Key: KAFKA-9211 > URL: https://issues.apache.org/jira/browse/KAFKA-9211 > Project: Kafka > Issue Type: Bug > Components: controller, producer >Affects Versions: 2.3.0 >Reporter: li xiangyuan >Priority: Critical > Attachments: ackdelay.txt, broker-jstack.txt, nodelay.txt, > producer-jstack.txt, producer.node.latency.png > > > Recently we try upgrade kafka from 0.10.0.1 to 2.3.0. > we have 15 clusters in production env, each one has 3~6 brokers. > we know kafka upgrade should: > 1.replcae code to 2.3.0.jar and restart all brokers one by one > 2.unset inter.broker.protocol.version=0.10.0.1 and restart all brokers > one by one > 3.unset log.message.format.version=0.10.0.1 and restart all brokers one > by one > > for now we have already done step 1 & 2 in 12 clusters.but when we try to > upgrade left clusters (already done step 1) in step 2, we found some topics > drop produce speed badly. > we have research this issue for long time, since we couldn't test it in > production environment and we couldn't reproduce in test environment, we > couldn't find the root cause. > now we only could describe the situation in detail as i know, hope anyone > could help us. > > 1.because bug KAFKA-8653, i add code below in KafkaApis.scala > handleJoinGroupRequest function: > {code:java} > if (rebalanceTimeoutMs <= 0) { > rebalanceTimeoutMs = joinGroupRequest.data.sessionTimeoutMs > }{code} > 2.one cluster upgrade failed has 6 8C16G brokers, about 200 topics with 2 > replicas,every broker keep 3000+ partitions and 1500+ leader partition, but > most of them has very low produce message speed,about less than > 50messages/sec, only one topic with 300 partitions has more than 2500 > message/sec with more than 20 consumer groups consume message from it. > so this whole cluster produce 4K messages/sec , 11m Bytes in /sec,240m Bytes > out /sec.and more than 90% traffic made by that topic has 2500messages/sec. > when we unset 5 or 6 servers' inter.broker.protocol.version=0.10.0.1 and > restart, this topic produce message drop to about 200messages/sec, i don't > know whether the way we use could tirgger any problem. > 3.we use kafka wrapped by spring-kafka and set kafkatemplate's > autoFlush=true, so each producer.send execution will execute producer.flush > immediately too.i know flush method will decrease produce performance > dramaticlly, but at least it seems nothing wrong before upgrade step 2. but > i doubt whether it's a problem now after upgrade. > 4.I noticed when produce speed decrease, some consumer group has large > message lag still consume message without any consume speed change or > decrease, so I guess only producerequest speed will drop down,but > fetchrequest not. > 5.we haven't set any throttle configuration, and all producers' acks=1(so > it's not broker replica fetch slow), and when this problem triggered, both > sever & producers cpu usage down, and servers' ioutil keep less than 30% ,so > it shuldn't be a hardware problem. > 6.this event triggered often(almost 100%) most brokers has done upgrade step > 2,then after a auto leader replica election executed, then we can observe > produce speed drop down,and we have to downgrade brokers(set > inter.broker.protocol.version=0.10.0.1)and restart brokers one by one,then it > could be normal. some cluster have to downgrade all brokers,but some cluster > could left 1 or 2 brokers without downgrade, i notice that the broker not > need downgrade is the controller. > 7.I have print jstack for producer & servers. although I do this not the same > cluster, but we can notice that their thread seems really in idle stat. > 8.both 0.10.0.1 & 2.3.0 kafka-client will trigger this problem too. > 8.unless the largest one topic will drop produce speed certainly, other topic > will drop produce speed randomly. maybe topicA will drop speed in first > upgrade attempt but next not, and topicB not drop speed in first attemp but > dropped when do another attempt. > 9.in fact, the largest cluster, has the same topic & group usage scenario > mentioned above, but the largest topic has 1w2 messages/sec,will upgrade fail > in step 1(just use 2.3.0.jar) > any help would be grateful, thx, i'm very sad now... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9211) kafka upgrade 2.3.0 cause produce speed decrease
[ https://issues.apache.org/jira/browse/KAFKA-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989369#comment-16989369 ] li xiangyuan commented on KAFKA-9211: - After some test, I believe we have successfully reproduced this problem in test environment. below shows what I did: 1.I start 6 8C16G brokers(kafka 2.3.0 with ) with config: {code:java} KAFKA_HEAP_OPTS=-Xmx6g -Xms6g -XX:MetaspaceSize=196m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80{code} in server.properties: {code:java} #inter.broker.protocol.version=0.10.0.1 log.message.format.version=0.10.0.1 {code} and then create 1 topic(called large300 below) with 300 partition, 220+ topics that has 40 partition(called a1,a2,and so on), all topics have 2 replicas so whole cluster has totally more than 1W8 partitions, each broker has 3000 partition and half of them (1500) works as leader partition. then this cluster's broker & partition num are similar to our production environment. 2.I run 30 2C4G clients(kafka-clients 0.10.0.1),each one do: * one thread pool has 100 thread,each one execute producerA.send("large300",[1024]random bytes) & producerA.flush() once per second, these threads need execute semaphore.acquire() (in my test code Semaphore semaphore = new Semaphore(44)) before producerA.send() and semaphore.release() after producerA.flush(). consider to network lag .etc, generally 30 clients will produce message to topic large300 6000/sec . * one thread execute code below: {code:java} while (true) { Thread.sleep(5); a++; int loop = a % 10 +1; for(int topicId =1;topicId<=loop;topicId++){ String t = "a"+topicId; String k = String.valueOf(System.currentTimeMillis()); producerB.send(t, data); producerB.flush(); } }{code} generally 30 clients will produce meesage to topic a1(1000message/sec) ~ a10(100message/sec) * start 21 consumer-groups, each group start 30 threads consume message from topic "large300". after do all , each client will has about 1000 connections to all 6 brokers. then I shutdown broker id6 gracefully and restart ,after a auto Preferred Replica Election (or excute manually), we can observe random topic produce speed drop down. when this happened,producer jmx metrics show request latency to node 6 become very high(producer.node.latency.png in attachment).but server jmx(kafka.server.TotalTimeMs.99thPercentile,type=RequestMetrics,name=TotalTimeMs,request=Produce) show produce request handle time didn't increase. finally I use tcpdump catch producer-server connection, I find something doubtful.you can see detail info in attachment: nodelay.txt show before Preferred Replica Election triggered,producer's request send very fast. ackdelay.txt show after Preferred Replica Election triggered, some producer's produce request will split to multi tcp package to send to broker, producer didn't send whole produce request in once but wait until get tcp ack pack , but broker somtimes send back ack package wait about 40ms, this status won't recover even after 12 hours. and execute ss can know tcp rtt become very high: {code:java} ss -i -t -e -m | grep -A 1 "24132"ss -i -t -e -m | grep -A 1 "24132"ESTAB 0 34593 [:::172.30.5.144]:24132 [:::172.30.3.36]:XmlIpcRegSvc timer:(on,208ms,0) uid:1080 ino:15632227 sk:933b84b13180 <-> skmem:(r0,rb65536,t0,tb262144,f7936,w57600,o0,bl0,d0) ts sack cubic wscale:2,0 rto:234 rtt:33.854/9.621 ato:40 mss:1412 rcvmss:536 advmss:1448 cwnd:10 ssthresh:7 bytes_acked:64001581 bytes_received:1366855 segs_out:48417 segs_in:48297 send 3.3Mbps lastsnd:26 lastrcv:150 lastack:26 pacing_rate 6.7Mbps unacked:1 rcv_rtt:26361 rcv_space:34696{code} so if a produce request split into 4 tcp packages,it will cost most 120ms in network. and the worst situation in my test, the tcp connection has windows only 51 bytes with wscale 2, it only allow producer send one tcp package less than 200 bytes,a produce request splited to 18 tcp pack, and broker send back ack would delay 200ms, it will cost 3500ms to send the whole produce request! I believe something happened after Preferred Replica Election(almost the update that combine all partitions leaderandisr requests to one request,in my situation it will ask broker 6 to change partitions status leader & stop fetch as slave for 1500 partitions) ,it trigger tcp Congestion Control,and will not recover forever unless I reinit the producer so it will build another health tcp connection. I dont exactly know whether it's a tcp Congestion Control,and why it couldn't recover, and whether it has any os options to avoid this?I have tried some linux os config (net.ipv4.tcp_low_latency=1,net.ipv4.tcp_adv_win_scale=10 .etc),all of them no use. hope any give me
[jira] [Updated] (KAFKA-9211) kafka upgrade 2.3.0 may cause tcp delay ack(Congestion Control)
[ https://issues.apache.org/jira/browse/KAFKA-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] li xiangyuan updated KAFKA-9211: Summary: kafka upgrade 2.3.0 may cause tcp delay ack(Congestion Control) (was: kafka upgrade 2.3.0 cause produce speed decrease) > kafka upgrade 2.3.0 may cause tcp delay ack(Congestion Control) > --- > > Key: KAFKA-9211 > URL: https://issues.apache.org/jira/browse/KAFKA-9211 > Project: Kafka > Issue Type: Bug > Components: controller, producer >Affects Versions: 2.3.0 >Reporter: li xiangyuan >Priority: Critical > Attachments: ackdelay.txt, broker-jstack.txt, nodelay.txt, > producer-jstack.txt, producer.node.latency.png > > > Recently we try upgrade kafka from 0.10.0.1 to 2.3.0. > we have 15 clusters in production env, each one has 3~6 brokers. > we know kafka upgrade should: > 1.replcae code to 2.3.0.jar and restart all brokers one by one > 2.unset inter.broker.protocol.version=0.10.0.1 and restart all brokers > one by one > 3.unset log.message.format.version=0.10.0.1 and restart all brokers one > by one > > for now we have already done step 1 & 2 in 12 clusters.but when we try to > upgrade left clusters (already done step 1) in step 2, we found some topics > drop produce speed badly. > we have research this issue for long time, since we couldn't test it in > production environment and we couldn't reproduce in test environment, we > couldn't find the root cause. > now we only could describe the situation in detail as i know, hope anyone > could help us. > > 1.because bug KAFKA-8653, i add code below in KafkaApis.scala > handleJoinGroupRequest function: > {code:java} > if (rebalanceTimeoutMs <= 0) { > rebalanceTimeoutMs = joinGroupRequest.data.sessionTimeoutMs > }{code} > 2.one cluster upgrade failed has 6 8C16G brokers, about 200 topics with 2 > replicas,every broker keep 3000+ partitions and 1500+ leader partition, but > most of them has very low produce message speed,about less than > 50messages/sec, only one topic with 300 partitions has more than 2500 > message/sec with more than 20 consumer groups consume message from it. > so this whole cluster produce 4K messages/sec , 11m Bytes in /sec,240m Bytes > out /sec.and more than 90% traffic made by that topic has 2500messages/sec. > when we unset 5 or 6 servers' inter.broker.protocol.version=0.10.0.1 and > restart, this topic produce message drop to about 200messages/sec, i don't > know whether the way we use could tirgger any problem. > 3.we use kafka wrapped by spring-kafka and set kafkatemplate's > autoFlush=true, so each producer.send execution will execute producer.flush > immediately too.i know flush method will decrease produce performance > dramaticlly, but at least it seems nothing wrong before upgrade step 2. but > i doubt whether it's a problem now after upgrade. > 4.I noticed when produce speed decrease, some consumer group has large > message lag still consume message without any consume speed change or > decrease, so I guess only producerequest speed will drop down,but > fetchrequest not. > 5.we haven't set any throttle configuration, and all producers' acks=1(so > it's not broker replica fetch slow), and when this problem triggered, both > sever & producers cpu usage down, and servers' ioutil keep less than 30% ,so > it shuldn't be a hardware problem. > 6.this event triggered often(almost 100%) most brokers has done upgrade step > 2,then after a auto leader replica election executed, then we can observe > produce speed drop down,and we have to downgrade brokers(set > inter.broker.protocol.version=0.10.0.1)and restart brokers one by one,then it > could be normal. some cluster have to downgrade all brokers,but some cluster > could left 1 or 2 brokers without downgrade, i notice that the broker not > need downgrade is the controller. > 7.I have print jstack for producer & servers. although I do this not the same > cluster, but we can notice that their thread seems really in idle stat. > 8.both 0.10.0.1 & 2.3.0 kafka-client will trigger this problem too. > 8.unless the largest one topic will drop produce speed certainly, other topic > will drop produce speed randomly. maybe topicA will drop speed in first > upgrade attempt but next not, and topicB not drop speed in first attemp but > dropped when do another attempt. > 9.in fact, the largest cluster, has the same topic & group usage scenario > mentioned above, but the largest topic has 1w2 messages/sec,will upgrade fail > in step 1(just use 2.3.0.jar) > any help would be grateful, thx, i'm very sad now... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure
[ https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989396#comment-16989396 ] Rohan Kulkarni commented on KAFKA-9270: --- [~vvcephei], [~bchen225242] - Thanks for your response guys. [~vvcephei] - I had looked at 'default.api.timeout.ms' parameter. However, its really tricky to guess how much we should increase this value so that the TimeoutException would never occur in production. Would/Do you recommend setting this to Integer.MAX_VALUE just like default value for parameter 'retries'.? The idea is Kafka streams should not crash due to some glitch at the Broker, rather it should recover just like the normal Kafka Consumers. Regards, Rohan > KafkaStream crash on offset commit failure > -- > > Key: KAFKA-9270 > URL: https://issues.apache.org/jira/browse/KAFKA-9270 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Rohan Kulkarni >Priority: Critical > > On our Production server we intermittently observe Kafka Streams get crashed > with TimeoutException while committing offset. The only workaround seems to > be restarting the application which is not a desirable solution for a > production environment. > > While have already implemented ProductionExceptionHandler which does not > seems to address this. > > Please provide a fix for this or a viable workaround. > > +Application side logs:+ > 2019-11-17 08:28:48.055 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373] > - stream-thread > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to > commit stream task 0_1 due to the following error:* > *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}} > > 2019-11-17 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 > 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager > MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: > HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions > = [], controller = null) Active tasks: Running: Suspended: Restoring: New: > Standby tasks: Running: Suspended: Restoring: New: > org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}} > > +Kafka broker logs:+ > [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f* > (org.apache.zookeeper.ClientCnxn) > [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > > Regards, > Rohan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure
[ https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989395#comment-16989395 ] Rohan Kulkarni commented on KAFKA-9270: --- [~vvcephei], [~bchen225242] - Thanks for your response guys. [~vvcephei] - I had looked at 'default.api.timeout.ms' parameter. However, its really tricky to guess how much we should increase this value so that the TimeoutException would never occur in production. Would/Do you recommend setting this to Integer.MAX_VALUE just like default value for parameter 'retries'.? The idea is Kafka streams should not crash due to some glitch at the Broker, rather it should recover just like the normal Kafka Consumers. Regards, Rohan > KafkaStream crash on offset commit failure > -- > > Key: KAFKA-9270 > URL: https://issues.apache.org/jira/browse/KAFKA-9270 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Rohan Kulkarni >Priority: Critical > > On our Production server we intermittently observe Kafka Streams get crashed > with TimeoutException while committing offset. The only workaround seems to > be restarting the application which is not a desirable solution for a > production environment. > > While have already implemented ProductionExceptionHandler which does not > seems to address this. > > Please provide a fix for this or a viable workaround. > > +Application side logs:+ > 2019-11-17 08:28:48.055 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373] > - stream-thread > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to > commit stream task 0_1 due to the following error:* > *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}} > > 2019-11-17 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 > 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager > MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: > HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions > = [], controller = null) Active tasks: Running: Suspended: Restoring: New: > Standby tasks: Running: Suspended: Restoring: New: > org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}} > > +Kafka broker logs:+ > [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f* > (org.apache.zookeeper.ClientCnxn) > [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > > Regards, > Rohan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure
[ https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989401#comment-16989401 ] John Roesler commented on KAFKA-9270: - Hey Rohan, I see where you’re coming from, but I definitely wouldn’t set the config super high. You don’t want the Streams thread to just pause indefinitely waiting for a single commit, holding up progress on the rest of its tasks. Better to pause a moderate amount of time and then let the thread die if the call is still hung. At least then it would give up its tasks, and some other thread can take over. Note also that you have to pay attention to the max poll interval config. If the commit call is stuck waiting, then the thread isn’t calling poll either, so it could drop out of the consumer group. Personally, I’d probably set them to something like 10 minutes, which would be a pretty long “glitch” for the broker, but also might be tolerable from a liveness perspective. Then, I’d add monitoring for thread deaths. Maybe even an external sidecar process to bounce Streams if some of the threads die. Not ideal, I know, but hopefully enough to help you bide your time until we work through the proposal that Boyang has started to actually fix this problem for you (see the linked ticket). > KafkaStream crash on offset commit failure > -- > > Key: KAFKA-9270 > URL: https://issues.apache.org/jira/browse/KAFKA-9270 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Rohan Kulkarni >Priority: Critical > > On our Production server we intermittently observe Kafka Streams get crashed > with TimeoutException while committing offset. The only workaround seems to > be restarting the application which is not a desirable solution for a > production environment. > > While have already implemented ProductionExceptionHandler which does not > seems to address this. > > Please provide a fix for this or a viable workaround. > > +Application side logs:+ > 2019-11-17 08:28:48.055 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373] > - stream-thread > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to > commit stream task 0_1 due to the following error:* > *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}} > > 2019-11-17 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 > 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager > MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: > HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions > = [], controller = null) Active tasks: Running: Suspended: Restoring: New: > Standby tasks: Running: Suspended: Restoring: New: > org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}} > > +Kafka broker logs:+ > [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f* > (org.apache.zookeeper.ClientCnxn) > [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > > Regards, > Rohan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure
[ https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989410#comment-16989410 ] Rohan Kulkarni commented on KAFKA-9270: --- Thanks [~vvcephei]. Appreciate your help on this. > KafkaStream crash on offset commit failure > -- > > Key: KAFKA-9270 > URL: https://issues.apache.org/jira/browse/KAFKA-9270 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Rohan Kulkarni >Priority: Critical > > On our Production server we intermittently observe Kafka Streams get crashed > with TimeoutException while committing offset. The only workaround seems to > be restarting the application which is not a desirable solution for a > production environment. > > While have already implemented ProductionExceptionHandler which does not > seems to address this. > > Please provide a fix for this or a viable workaround. > > +Application side logs:+ > 2019-11-17 08:28:48.055 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373] > - stream-thread > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to > commit stream task 0_1 due to the following error:* > *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}} > > 2019-11-17 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 > 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager > MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: > HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions > = [], controller = null) Active tasks: Running: Suspended: Restoring: New: > Standby tasks: Running: Suspended: Restoring: New: > org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}} > > +Kafka broker logs:+ > [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f* > (org.apache.zookeeper.ClientCnxn) > [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > > Regards, > Rohan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9270) KafkaStream crash on offset commit failure
[ https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989410#comment-16989410 ] Rohan Kulkarni edited comment on KAFKA-9270 at 12/6/19 5:41 AM: Thanks [~vvcephei]. Appreciate your help on this. Just to clarify, as per my understanding with KAFKA-8803 we are targeting to fix the TimeoutException in initTxn and Producer flow. Are we also going to target the consumer commitOffset flow with that issue? Otherwise current ticket will be better for independent tracking... was (Author: rohan26may): Thanks [~vvcephei]. Appreciate your help on this. > KafkaStream crash on offset commit failure > -- > > Key: KAFKA-9270 > URL: https://issues.apache.org/jira/browse/KAFKA-9270 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Rohan Kulkarni >Priority: Critical > > On our Production server we intermittently observe Kafka Streams get crashed > with TimeoutException while committing offset. The only workaround seems to > be restarting the application which is not a desirable solution for a > production environment. > > While have already implemented ProductionExceptionHandler which does not > seems to address this. > > Please provide a fix for this or a viable workaround. > > +Application side logs:+ > 2019-11-17 08:28:48.055 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373] > - stream-thread > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to > commit stream task 0_1 due to the following error:* > *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}} > > 2019-11-17 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 > 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager > MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: > HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions > = [], controller = null) Active tasks: Running: Suspended: Restoring: New: > Standby tasks: Running: Suspended: Restoring: New: > org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}} > > +Kafka broker logs:+ > [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f* > (org.apache.zookeeper.ClientCnxn) > [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > > Regards, > Rohan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9275) Print assignment and IP address in the log message when consumer leaves/removed from the group
[ https://issues.apache.org/jira/browse/KAFKA-9275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989443#comment-16989443 ] dengziming commented on KAFKA-9275: --- hi, we can print consumer information in rebalance logs, but I think it is useless for the `GroupCoordinator` only contains a memberId of the consumer information which is random generated, the IP address of a consumer is not kept in the `GroupCoordinator`. > Print assignment and IP address in the log message when consumer > leaves/removed from the group > -- > > Key: KAFKA-9275 > URL: https://issues.apache.org/jira/browse/KAFKA-9275 > Project: Kafka > Issue Type: Improvement >Reporter: Badai Aqrandista >Priority: Minor > > In order to simplify identification of which member is causing rebalance, can > we add the IP address and the list of topic-partitions assigned to the member > that leaves/removed from the group? > And especially in the "reason" string for rebalance: > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964 > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972 > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144 > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493 > This will allow a much faster investigation when a consumer group is stuck in > rebalancing state. > Even better if we can send this reason to all other members that are affected > by the rebalance. Currently, it takes forever to know what caused a rebalance > when a consumer group continuously goes into rebalancing state due to one > member keeps exceeding max.poll.intervals.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-9275) Print assignment and IP address in the log message when consumer leaves/removed from the group
[ https://issues.apache.org/jira/browse/KAFKA-9275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming updated KAFKA-9275: -- Comment: was deleted (was: hi, we can print consumer information in rebalance logs, but I think it is useless for the `GroupCoordinator` only contains a memberId of the consumer information which is random generated, the IP address of a consumer is not kept in the `GroupCoordinator`.) > Print assignment and IP address in the log message when consumer > leaves/removed from the group > -- > > Key: KAFKA-9275 > URL: https://issues.apache.org/jira/browse/KAFKA-9275 > Project: Kafka > Issue Type: Improvement >Reporter: Badai Aqrandista >Priority: Minor > > In order to simplify identification of which member is causing rebalance, can > we add the IP address and the list of topic-partitions assigned to the member > that leaves/removed from the group? > And especially in the "reason" string for rebalance: > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964 > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972 > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144 > https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493 > This will allow a much faster investigation when a consumer group is stuck in > rebalancing state. > Even better if we can send this reason to all other members that are affected > by the rebalance. Currently, it takes forever to know what caused a rebalance > when a consumer group continuously goes into rebalancing state due to one > member keeps exceeding max.poll.intervals.ms. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9212) Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest
[ https://issues.apache.org/jira/browse/KAFKA-9212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-9212: -- Assignee: Jason Gustafson > Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest > -- > > Key: KAFKA-9212 > URL: https://issues.apache.org/jira/browse/KAFKA-9212 > Project: Kafka > Issue Type: Bug > Components: consumer, offset manager >Affects Versions: 2.3.0 > Environment: Linux >Reporter: Yannick >Assignee: Jason Gustafson >Priority: Critical > > When running Kafka connect s3 sink connector ( confluent 5.3.0), after one > broker got restarted (leaderEpoch updated at this point), the connect worker > crashed with the following error : > [2019-11-19 16:20:30,097] ERROR [Worker clientId=connect-1, > groupId=connect-ls] Uncaught exception in herder work thread, exiting: > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253) > org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by > times in 30003ms > > After investigation, it seems it's because it got fenced when sending > ListOffsetRequest in loop and then got timed out , as follows : > [2019-11-19 16:20:30,020] DEBUG [Consumer clientId=consumer-3, > groupId=connect-ls] Sending ListOffsetRequest (type=ListOffsetRequest, > replicaId=-1, partitionTimestamps={connect_ls_config-0={timestamp: -1, > maxNumOffsets: 1, currentLeaderEpoch: Optional[1]}}, > isolationLevel=READ_UNCOMMITTED) to broker kafka6.fra2.internal:9092 (id: 4 > rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:905) > [2019-11-19 16:20:30,044] DEBUG [Consumer clientId=consumer-3, > groupId=connect-ls] Attempt to fetch offsets for partition > connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. > (org.apache.kafka.clients.consumer.internals.Fetcher:985) > > The above happens multiple times until timeout. > > According to the debugs, the consumer always get a leaderEpoch of 1 for this > topic when starting up : > > [2019-11-19 13:27:30,802] DEBUG [Consumer clientId=consumer-3, > groupId=connect-ls] Updating last seen epoch from null to 1 for partition > connect_ls_config-0 (org.apache.kafka.clients.Metadata:178) > > > But according to our brokers log, the leaderEpoch should be 2, as follows : > > [2019-11-18 14:19:28,988] INFO [Partition connect_ls_config-0 broker=4] > connect_ls_config-0 starts at Leader Epoch 2 from offset 22. Previous Leader > Epoch was: 1 (kafka.cluster.Partition) > > > This make impossible to restart the worker as it will always get fenced and > then finally timeout. > > It is also impossible to consume with a 2.3 kafka-console-consumer as > follows : > > kafka-console-consumer --bootstrap-server BOOTSTRAPSERVER:9092 --topic > connect_ls_config --from-beginning > > the above will just hang forever ( which is not expected cause there is > data) and we can see those debug messages : > [2019-11-19 22:17:59,124] DEBUG [Consumer clientId=consumer-1, > groupId=console-consumer-3844] Attempt to fetch offsets for partition > connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. > (org.apache.kafka.clients.consumer.internals.Fetcher) > > > Interesting fact, if we do subscribe the same way with kafkacat (1.5.0) we > can consume without problem ( must be the way kafkacat is consuming ignoring > FENCED_LEADER_EPOCH): > > kafkacat -b BOOTSTRAPSERVER:9092 -t connect_ls_config -o beginning > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9277) move all group state transition rules into their states
dengziming created KAFKA-9277: - Summary: move all group state transition rules into their states Key: KAFKA-9277 URL: https://issues.apache.org/jira/browse/KAFKA-9277 Project: Kafka Issue Type: Improvement Reporter: dengziming Assignee: dengziming Today the `GroupMetadata` maintain a validPreviousStates map of all GroupState: ``` private val validPreviousStates: Map[GroupState, Set[GroupState]] = Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead), CompletingRebalance -> Set(PreparingRebalance), Stable -> Set(CompletingRebalance), PreparingRebalance -> Set(Stable, CompletingRebalance, Empty), Empty -> Set(PreparingRebalance)) ``` It would be cleaner to move all state transition rules into their states : ``` private[group] sealed trait GroupState { val validPreviousStates: Set[GroupState] } ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)