[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext

2019-12-05 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988579#comment-16988579
 ] 

Bruno Cadonna commented on KAFKA-9088:
--

I understand all your concerns and sometimes I also had hard times with 
EasyMock. However, I think the issues are only partly caused by EasyMock. The 
design of the production code may leak internals, the unit test may have issues 
because it tests too much or the wrong aspects, and EasyMock may be used 
wrongly. For instance, sometimes it does not make sense to specify how often a 
method on the mock is called or if it is called at all. That would be the case 
for the {{InternalProcessorContext}} mock and EasyMock offers functionality 
that allows to specify that. I would like to explore this part of EasyMock in 
this ticket. 

> Consolidate InternalMockProcessorContext and MockInternalProcessorContext
> -
>
> Key: KAFKA-9088
> URL: https://issues.apache.org/jira/browse/KAFKA-9088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Bruno Cadonna
>Assignee: Pierangelo Di Pilato
>Priority: Minor
>  Labels: newbie
>
> Currently, we have two mocks for the {{InternalProcessorContext}}. The goal 
> of this ticket is to merge both into one mock or replace it with an 
> {{EasyMock}} mock. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-12-05 Thread Francisco Juan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988623#comment-16988623
 ] 

Francisco Juan commented on KAFKA-7447:
---

[~timvanlaer] we are seeing the same error that you saw with the message "Error 
loading offsets" message from the GroupMetadataManager class.

We have two questions:

1. Were you able to reproduce the error? If so, could you give us a hint?
2. Did updating to version 2.2.2 fix the issue?

Our setup looks like this:

Kafka version: 2.2.1
Number of brokers: 30
Number of leader partitions: 15785
Number of consumer-groups: 1150
inter.broker.protocol.version=1.1
min.insync.replicas=2

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broke

[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-12-05 Thread Karolis Pocius (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988629#comment-16988629
 ] 

Karolis Pocius commented on KAFKA-7447:
---

[~francisco.juan], we went straight for 2.3.1, but seeing as the fix for 
KAFKA-8896 is included in both 2.2.2 and 2.3.1, I'd image upgrading should fix 
your issue.

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] 
> __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous 
> Leader Epoch was: 77 (kafka.cluster.Partition)
>  [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataM

[jira] [Commented] (KAFKA-9219) NullPointerException when polling metrics from Kafka Connect

2019-12-05 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988631#comment-16988631
 ] 

Mickael Maison commented on KAFKA-9219:
---

Great! I wanted to backport it to 2.4 but was waiting for 2.4.0 to release. 
Thanks for letting me know

> NullPointerException when polling metrics from Kafka Connect
> 
>
> Key: KAFKA-9219
> URL: https://issues.apache.org/jira/browse/KAFKA-9219
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 2.4.0, 2.5.0
>
>
> The following stack trace appears:
>  
> {code:java}
> [2019-11-05 23:56:57,909] WARN Error getting JMX attribute 'assigned-tasks' 
> (org.apache.kafka.common.metrics.JmxReporter:202)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.connect.runtime.distributed.WorkerCoordinator$WorkerCoordinatorMetrics$2.measure(WorkerCoordinator.java:316)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:66)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:190)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:200)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
>   at 
> javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1449)
>   at 
> javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
>   at 
> javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
>   at 
> javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
>   at 
> javax.management.remote.rmi.RMIConnectionImpl.getAttributes(RMIConnectionImpl.java:675)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)
>   at sun.rmi.transport.Transport$1.run(Transport.java:200)
>   at sun.rmi.transport.Transport$1.run(Transport.java:197)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
>   at 
> sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)
>   at 
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:835)
>   at 
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at 
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-1, 
> groupId=backup-mm2] Herder stopped 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629)
> [2019-11-05 23:57:02,821] INFO [Worker clientId=connect-2, groupId=cv-mm2] 
> Herder stopping 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:609)
> [2019-11-05 23:57:07,822] INFO [Worker clientId=connect-2, groupId=cv-mm2] 
> Herder stopped 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:629)
> [2019-11-05 23:57:07,822] INFO Kafka MirrorMaker stopped. 
> (org.apache.kafka.connect.mirror.MirrorMaker:191)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8896) NoSuchElementException after coordinator move

2019-12-05 Thread Francisco Juan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988654#comment-16988654
 ] 

Francisco Juan commented on KAFKA-8896:
---

Hello [~hachikuji], could you please provide the steps or scenario that 
triggers this error?

 

We have a cluster running version 2.2.1 and it is throwing this same error 
+sometimes+ when there's a broker restart.

We can correlate the error that you saw with an earlier error looking like 
"ERROR [GroupMetadataManager brokerId=2] Error loading offsets from 
__consumer_offsets". This happen short after a broker restarts.

We can't reproduce this error in a test environment, which we would like to do 
to verify if an upgrade would actually fix our issue (some consumer-groups 
losing their offsets)

 

Our cluster setup looks like this:
{code:java}
Kafka version: 2.2.1
Number of brokers: 30
Number of leader partitions: 15785
Number of consumer-groups: 1150
inter.broker.protocol.version=1.1
min.insync.replicas=2{code}

Errors stack trace detail:
{code:java}
[2019-11-28 08:13:22,603] ERROR [KafkaApi-42] Error when handling request: 
clientId=enrichment-worker-kafka, correlationId=92, api=HEARTBEAT, 
body={group_id=enrichment-worker-importio-webhook-consumer-eu,generation_id=9877,member_id=enrichment-worker-kafka-25821b62-f36b-4e64-905b-92019e4a5493}
 (kafka.server.KafkaApis)
java.util.NoSuchElementException: key not found: 
consumer-name-25821b62-f36b-4e64-905b-92019e4a5493
 at scala.collection.MapLike.default(MapLike.scala:235)
 at scala.collection.MapLike.default$(MapLike.scala:234)
 at scala.collection.AbstractMap.default(Map.scala:63)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:69)
 at kafka.coordinator.group.GroupMetadata.get(GroupMetadata.scala:203)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$tryCompleteHeartbeat$1(GroupCoordinator.scala:927)
 at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198)
 at 
kafka.coordinator.group.GroupCoordinator.tryCompleteHeartbeat(GroupCoordinator.scala:920)
 at 
kafka.coordinator.group.DelayedHeartbeat.tryComplete(DelayedHeartbeat.scala:34)
 at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
 at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:388)
 at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:294)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextExpiration(GroupCoordinator.scala:737)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextHeartbeatExpiration(GroupCoordinator.scala:730)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleHeartbeat$2(GroupCoordinator.scala:486)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198)
 at 
kafka.coordinator.group.GroupCoordinator.handleHeartbeat(GroupCoordinator.scala:451)
 at kafka.server.KafkaApis.handleHeartbeatRequest(KafkaApis.scala:1336)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:120)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
 at java.lang.Thread.run(Thread.java:748)

[2019-11-28 08:13:18,175] ERROR [GroupMetadataManager brokerId=42] Error 
loading offsets from __consumer_offsets-24 
(kafka.coordinator.group.GroupMetadataManager)
java.util.NoSuchElementException: key not found: 
consumer-name-de868651-3166-46df-98c5-6196b9ade526
 at scala.collection.MapLike.default(MapLike.scala:235)
 at scala.collection.MapLike.default(MapLike.scala:235)
 at scala.collection.MapLike.default$(MapLike.scala:234)
 at scala.collection.AbstractMap.default(Map.scala:63)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:69)
 at kafka.coordinator.group.GroupMetadata.get(GroupMetadata.scala:203)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$tryCompleteHeartbeat$1(GroupCoordinator.scala:927)
 at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198)
 at 
kafka.coordinator.group.GroupCoordinator.tryCompleteHeartbeat(GroupCoordinator.scala:920)
 at 
kafka.coordinator.group.DelayedHeartbeat.tryComplete(DelayedHeartbeat.scala:34)
 at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
 at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:388)
 at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:294)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextExpiration(GroupCoordinator.scala:737)
 at 
kafka.coordinator.group.GroupCoordina

[jira] [Created] (KAFKA-9271) Client hangs until timeout when SSL handshake fails

2019-12-05 Thread Richard Wise (Jira)
Richard Wise created KAFKA-9271:
---

 Summary: Client hangs until timeout when SSL handshake fails
 Key: KAFKA-9271
 URL: https://issues.apache.org/jira/browse/KAFKA-9271
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.0.1
Reporter: Richard Wise


I accidentally forgot to set `security.protocol` to `SSL`, which meant that the 
Kafka client tried connecting to an SSL port via PLAINTEXT. I would expect this 
to immediate fail on the client side with an obvious message that the security 
settings do not match the server. However, instead the client continually 
retries (I can see the SSL handshake failure messages in the server logs) 
before timing out on querying metadata.

This behaviour is confusing and leads the developer to think that there is an 
issue of latency or load and that they need to increase hardware or timeouts. 
In addition, the continual retries (without any backoff) cause significant CPU 
load on both the client and server that can cause additional issues.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9272) Error when trying to connect to SSL port using PLAINTEXT is very confusing

2019-12-05 Thread Richard Wise (Jira)
Richard Wise created KAFKA-9272:
---

 Summary: Error when trying to connect to SSL port using PLAINTEXT 
is very confusing
 Key: KAFKA-9272
 URL: https://issues.apache.org/jira/browse/KAFKA-9272
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.2.1
Reporter: Richard Wise


I accidentally forgot to set `security.protocol` to SSL when connecting to an 
SSL port, so connected via PLAINTEXT. I expected the Kafka client to fail 
immediately with a message about mismatched security protocols, but instead it 
failed saying that the topic does not exist and auto-creation was not enabled.

This error message is very misleading and leads developers to think that the 
problem is the server configuration, not the client network security settings.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-12-05 Thread Francisco Juan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988680#comment-16988680
 ] 

Francisco Juan commented on KAFKA-7447:
---

Thanks [~Karolis].

Would it be possible for you to describe an scenario where this issue is 
reproduced? I'd really like to confirm that updating the cluster version would 
fix this.

Best regards

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] 
> __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous 
> Leader Epoch was: 77 (kafka.cluster.Partition)
>  [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-29 
> (kafka.

[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-12-05 Thread Karolis Pocius (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988699#comment-16988699
 ] 

Karolis Pocius commented on KAFKA-7447:
---

We've encountered the issue not after a clean shutdown, as described 
originally, but when one of the brokers lost zookeeper connection and then 
reconnected. So to test the fix I was simply closing zookeeper port using 
iptables on one of the brokers for a few seconds and reopening it.


> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] 
> __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous 
> Leader Epoch was: 77 (kafka.cluster.Partition)
>  [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1

[jira] [Commented] (KAFKA-9250) Kafka streams stuck in rebalancing state after UnknownHostException

2019-12-05 Thread Vijay (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988732#comment-16988732
 ] 

Vijay commented on KAFKA-9250:
--

Thanks [~guozhang], we are planning to update kafka clients dependency to 
latest one (2.3.1) available as there are some more defects fixed from 2.2.0 
and will update the results here.

> Kafka streams stuck in rebalancing state after UnknownHostException
> ---
>
> Key: KAFKA-9250
> URL: https://issues.apache.org/jira/browse/KAFKA-9250
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, config, network, streams
>Affects Versions: 2.2.0
>Reporter: Vijay
>Priority: Critical
>
> We are using kafka streams (2.2.0) application for reading messages from 
> source topic do some transformation and send to destination topic. 
> Application started fine and processed messages till it encountered 
> UnknownHostException, after which application is hung in rebalancing state 
> and not processing messages.
>  
> Below are the properties we have configured :
> application.id = *
> bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3"
> num.stream.threads=3
> replication.factor=3
> num.standby.replicas=1
> max.block.ms=8640
> acks=all
> auto.offset.reset=earliest
> processing.guarantee=exactly_once
>  
> Additional details.
> Number of brokers - 3
> Source topic partition count - 12 and replication factor of 3
> Destination topic partition count - 12 and replication factor of 3
> 4 instances of stream application are deployed in docker containers.
>  
> Below are the some of the logs :
> {noformat}
> [WARN] [streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1] 
> o.a.k.clients.NetworkClient - [Consumer 
> clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer,
>  groupId=streams-example] Error connecting to node hostname1:port1 (id: 
> 2147438464 rack: null)
> java.net.UnknownHostException: hostname1 
> at java.net.InetAddress.getAllByName0(InetAddress.java:1280) 
> at java.net.InetAddress.getAllByName(InetAddress.java:1192) 
> at java.net.InetAddress.getAllByName(InetAddress.java:1126) 
> at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) 
> at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387)
>  
> at 
> org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121)
>  
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917)
>  
> at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) 
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>  
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
>  
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
>  
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
>  
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:850)
>  
> at 
> org.apache.kafka.streams.proc

[jira] [Commented] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker

2019-12-05 Thread Oleg Muravskiy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988777#comment-16988777
 ] 

Oleg Muravskiy commented on KAFKA-9173:
---

I understand that this is how it is *implemented*, but it is not how it is 
*documented*, or at least I don't see it anywhere in the Streams documentation, 
apart from the description of the (default and only one) 
{{DefaultPartitionGrouper}}.

Is there a reason for such behaviour? Could I just implement an alternative 
{{PartitionGrouper}} that will assign each partition to a new task?

> StreamsPartitionAssignor assigns partitions to only one worker
> --
>
> Key: KAFKA-9173
> URL: https://issues.apache.org/jira/browse/KAFKA-9173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Oleg Muravskiy
>Priority: Major
>  Labels: user-experience
> Attachments: StreamsPartitionAssignor.log
>
>
> I'm running a distributed KafkaStreams application on 10 worker nodes, 
> subscribed to 21 topics with 10 partitions in each. I'm only using a 
> Processor interface, and a persistent state store.
> However, only one worker gets assigned partitions, all other workers get 
> nothing. Restarting the application, or cleaning local state stores does not 
> help. StreamsPartitionAssignor migrates to other nodes, and eventually picks 
> up other node to assign partitions to, but still only one node.
> It's difficult to figure out where to look for the signs of problems, I'm 
> attaching the log messages from the StreamsPartitionAssignor. Let me know 
> what else I could provide to help resolve this.
> [^StreamsPartitionAssignor.log]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-9261) NPE when updating client metadata

2019-12-05 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar reopened KAFKA-9261:
--

> NPE when updating client metadata
> -
>
> Key: KAFKA-9261
> URL: https://issues.apache.org/jira/browse/KAFKA-9261
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.0, 2.3.2
>
>
> We have seen the following exception recently:
> {code}
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at org.apache.kafka.common.Cluster.(Cluster.java:134)
>   at org.apache.kafka.common.Cluster.(Cluster.java:89)
>   at 
> org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:120)
>   at org.apache.kafka.clients.MetadataCache.(MetadataCache.java:82)
>   at org.apache.kafka.clients.MetadataCache.(MetadataCache.java:58)
>   at 
> org.apache.kafka.clients.Metadata.handleMetadataResponse(Metadata.java:325)
>   at org.apache.kafka.clients.Metadata.update(Metadata.java:252)
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleCompletedMetadataResponse(NetworkClient.java:1059)
>   at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:845)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:548)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
> {code}
> The client assumes that if a leader is included in the response, then node 
> information must also be available. There are at least a couple possible 
> reasons this assumption can fail:
> 1. The client is able to detect stale partition metadata using leader epoch 
> information available. If stale partition metadata is detected, the client 
> ignores it and uses the last known metadata. However, it cannot detect stale 
> broker information and will always accept the latest update. This means that 
> the latest metadata may be a mix of multiple metadata responses and therefore 
> the invariant will not generally hold.
> 2. There is no lock which protects both the fetching of partition metadata 
> and the live broker when handling a Metadata request. This means an 
> UpdateMetadata request can arrive concurrently and break the intended 
> invariant.
> It seems case 2 has been possible for a long time, but it should be extremely 
> rare. Case 1 was only made possible with KIP-320, which added the leader 
> epoch tracking. It should also be rare, but the window for inconsistent 
> metadata is probably a bit bigger than the window for a concurrent update.
> To fix this, we should make the client more defensive about metadata updates 
> and not assume that the leader is among the live endpoints.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9088) Consolidate InternalMockProcessorContext and MockInternalProcessorContext

2019-12-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988913#comment-16988913
 ] 

John Roesler commented on KAFKA-9088:
-

Thanks, Bruno, I don’t want you to feel besieged. It seems like this 
conversation has yielded a good list of common pitfalls with EasyMock, but at 
the end of the day the only way to know for sure is to try to make it work. 

It should be obvious from the diff whether it really makes things easier or 
not. As long as it’s approached in the spirit of a prototype, it shouldn’t be 
too frustrating to just take another stab at the implementation. 

Just one specific thought... It may be possible that some tests don’t need a 
rich interaction with the context. In those cases, maybe they can just create a 
mock inline. This might free up some of the constraints to simplify 
MockInternalProcessorContext for use with just the tests that need a more 
complex interaction. I.e., sometimes trying for a general drop-in replacement 
isn’t necessary, and just complicates everything. 

> Consolidate InternalMockProcessorContext and MockInternalProcessorContext
> -
>
> Key: KAFKA-9088
> URL: https://issues.apache.org/jira/browse/KAFKA-9088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Bruno Cadonna
>Assignee: Pierangelo Di Pilato
>Priority: Minor
>  Labels: newbie
>
> Currently, we have two mocks for the {{InternalProcessorContext}}. The goal 
> of this ticket is to merge both into one mock or replace it with an 
> {{EasyMock}} mock. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure

2019-12-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988923#comment-16988923
 ] 

John Roesler commented on KAFKA-9270:
-

Hey Rohan,

There is a Consumer config to control how long it will wait for an ack on 
commit. You can just increase this config from the default of 60 seconds by 
passing it in as part of your Streams configs. 

> KafkaStream crash on offset commit failure
> --
>
> Key: KAFKA-9270
> URL: https://issues.apache.org/jira/browse/KAFKA-9270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Rohan Kulkarni
>Priority: Critical
>
> On our Production server we intermittently observe Kafka Streams get crashed 
> with TimeoutException while committing offset. The only workaround seems to 
> be restarting the application which is not a desirable solution for a 
> production environment.
>  
> While have already implemented ProductionExceptionHandler which does not 
> seems to address this.
>  
> Please provide a fix for this or a viable workaround.
>  
> +Application side logs:+
> 2019-11-17 08:28:48.055 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks 
> [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373]
>  - stream-thread 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to 
> commit stream task 0_1 due to the following error:*
>  *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}}
>  
> 2019-11-17 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 
> 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager 
> MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: 
> HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions 
> = [], controller = null) Active tasks: Running: Suspended: Restoring: New: 
> Standby tasks: Running: Suspended: Restoring: New:
>  org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}}
>  
> +Kafka broker logs:+
> [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f* 
> (org.apache.zookeeper.ClientCnxn)
>  [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  
> Regards,
> Rohan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes

2019-12-05 Thread Bill Bejeck (Jira)
Bill Bejeck created KAFKA-9273:
--

 Summary: Refactor AbstractJoinIntegrationTest and Sub-classes
 Key: KAFKA-9273
 URL: https://issues.apache.org/jira/browse/KAFKA-9273
 Project: Kafka
  Issue Type: Test
  Components: streams
Reporter: Bill Bejeck
Assignee: Bill Bejeck


The  AbstractJoinIntegrationTest uses an embedded broker, but not all the 
sub-classes require the use of an embedded broker anymore.  Additionally, there 
are two test remaining that require an embedded broker, but they don't perform 
joins, the are tests validating other conditions, so ideally those tests should 
move into a separate test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9268) Follow-on: Streams Threads may die from recoverable errors with EOS enabled

2019-12-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988953#comment-16988953
 ] 

John Roesler commented on KAFKA-9268:
-

The only compatibility concern I'm aware of is that 2.2.1+ requires broker 
message format 0.11+ .

But there might be other constraints keeping people on older versions of 
Streams, the biggest one being a desire to use more fully-baked versions, 
rather than bleeding-edge ones. 

Still, this might be the kind of problem you can live with, unless your 
environment is _extremely_ flaky. I created this ticket partly just to document 
that we're aware of the flaw, which hopefully would be helpful to a person 
trying to get to the bottom of why their threads are dying. Also, it captures 
valuable (and hard-won) context in case someone does want to pick this up and 
fix it for older versions.

> Follow-on: Streams Threads may die from recoverable errors with EOS enabled
> ---
>
> Key: KAFKA-9268
> URL: https://issues.apache.org/jira/browse/KAFKA-9268
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: John Roesler
>Priority: Major
> Attachments: 2.2-eos-failures-1.txt, 2.2-eos-failures-2.txt
>
>
> While testing Streams in EOS mode under frequent and heavy network 
> partitions, I've encountered exceptions leading to thread death in both 2.2 
> and 2.3 (although different exceptions).
> I believe this problem is addressed in 2.4+ by 
> https://issues.apache.org/jira/browse/KAFKA-9231 , however, if you look at 
> the ticket and corresponding PR, you will see that the solution there 
> introduced some tech debt around UnknownProducerId that needs to be cleaned 
> up. Therefore, I'm not backporting that fix to older branches. Rather, I'm 
> opening a new ticket to make more conservative changes in older branches to 
> improve resilience, if desired.
> These failures are relative rare, so I don't think that a system or 
> integration test could reliably reproduce it. The steps to reproduce would be:
> 1. set up a long-running Streams application with EOS enabled (I used three 
> Streams instances)
> 2. inject periodic network partitions (I had each Streams instance schedule 
> an interruption at a random time between 0 and 3 hours, then schedule the 
> interruption to last a random duration between 0 and 5 minutes. The 
> interruptions are accomplished by using iptables to drop all traffic to/from 
> all three brokers)
> As far as the actual errors I've observed, I'm attaching the logs of two 
> incidents in which a thread was caused to shut down.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure

2019-12-05 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16988991#comment-16988991
 ] 

Boyang Chen commented on KAFKA-9270:


[~vvcephei] In fact timeout exceptions is very common thing, so we probably 
want to discuss whether we should fail the thread immediately when there is a 
timeout, for example https://issues.apache.org/jira/browse/KAFKA-8803 has a 
similar problem with initialization.

> KafkaStream crash on offset commit failure
> --
>
> Key: KAFKA-9270
> URL: https://issues.apache.org/jira/browse/KAFKA-9270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Rohan Kulkarni
>Priority: Critical
>
> On our Production server we intermittently observe Kafka Streams get crashed 
> with TimeoutException while committing offset. The only workaround seems to 
> be restarting the application which is not a desirable solution for a 
> production environment.
>  
> While have already implemented ProductionExceptionHandler which does not 
> seems to address this.
>  
> Please provide a fix for this or a viable workaround.
>  
> +Application side logs:+
> 2019-11-17 08:28:48.055 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks 
> [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373]
>  - stream-thread 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to 
> commit stream task 0_1 due to the following error:*
>  *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}}
>  
> 2019-11-17 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 
> 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager 
> MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: 
> HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions 
> = [], controller = null) Active tasks: Running: Suspended: Restoring: New: 
> Standby tasks: Running: Suspended: Restoring: New:
>  org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}}
>  
> +Kafka broker logs:+
> [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f* 
> (org.apache.zookeeper.ClientCnxn)
>  [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  
> Regards,
> Rohan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker

2019-12-05 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989070#comment-16989070
 ] 

Sophie Blee-Goldman commented on KAFKA-9173:


That's a good point, it does need to be documented. We should consider making 
this more flexible, either by making this user-customizable (for example by 
adding a `separateNodeGroups` flag to the StreamBuilder.stream(Pattern) 
overload or maybe by autoscaling according to some heuristic. 

As far as workarounds for now, I wouldn't necessarily recommend implementing a 
custom PartitionGrouper since that's been deprecated in 2.4. You could always 
use normal topic subscription to source each individually and then do something 
like for (KStream stream : inputTopics) { _processingTopology(stream)_ } 

> StreamsPartitionAssignor assigns partitions to only one worker
> --
>
> Key: KAFKA-9173
> URL: https://issues.apache.org/jira/browse/KAFKA-9173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Oleg Muravskiy
>Priority: Major
>  Labels: user-experience
> Attachments: StreamsPartitionAssignor.log
>
>
> I'm running a distributed KafkaStreams application on 10 worker nodes, 
> subscribed to 21 topics with 10 partitions in each. I'm only using a 
> Processor interface, and a persistent state store.
> However, only one worker gets assigned partitions, all other workers get 
> nothing. Restarting the application, or cleaning local state stores does not 
> help. StreamsPartitionAssignor migrates to other nodes, and eventually picks 
> up other node to assign partitions to, but still only one node.
> It's difficult to figure out where to look for the signs of problems, I'm 
> attaching the log messages from the StreamsPartitionAssignor. Let me know 
> what else I could provide to help resolve this.
> [^StreamsPartitionAssignor.log]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure

2019-12-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989123#comment-16989123
 ] 

John Roesler commented on KAFKA-9270:
-

Agreed, [~bchen225242], and I'm actually on board with your line of thinking, 
but I wanted to make sure that [~rohan26may] gets an actionable answer to his 
question, which is that he needs to increase the timeout configuration.

Can you perhaps create a new ticket, or a thread in the dev mailing list to 
discuss changing Streams's general approach for handling transient client 
failures?

> KafkaStream crash on offset commit failure
> --
>
> Key: KAFKA-9270
> URL: https://issues.apache.org/jira/browse/KAFKA-9270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Rohan Kulkarni
>Priority: Critical
>
> On our Production server we intermittently observe Kafka Streams get crashed 
> with TimeoutException while committing offset. The only workaround seems to 
> be restarting the application which is not a desirable solution for a 
> production environment.
>  
> While have already implemented ProductionExceptionHandler which does not 
> seems to address this.
>  
> Please provide a fix for this or a viable workaround.
>  
> +Application side logs:+
> 2019-11-17 08:28:48.055 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks 
> [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373]
>  - stream-thread 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to 
> commit stream task 0_1 due to the following error:*
>  *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}}
>  
> 2019-11-17 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 
> 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager 
> MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: 
> HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions 
> = [], controller = null) Active tasks: Running: Suspended: Restoring: New: 
> Standby tasks: Running: Suspended: Restoring: New:
>  org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}}
>  
> +Kafka broker logs:+
> [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f* 
> (org.apache.zookeeper.ClientCnxn)
>  [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  
> Regards,
> Rohan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9131) failed producer metadata updates result in the unrelated error message

2019-12-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989159#comment-16989159
 ] 

ASF GitHub Bot commented on KAFKA-9131:
---

guozhangwang commented on pull request #7635: KAFKA-9131: Remove dead code for 
handling timeout exception
URL: https://github.com/apache/kafka/pull/7635
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> failed producer metadata updates result in the unrelated error message
> --
>
> Key: KAFKA-9131
> URL: https://issues.apache.org/jira/browse/KAFKA-9131
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Gleb Komissarov
>Assignee: Gleb Komissarov
>Priority: Major
>
> {{Producer Metadata TimeoutException}} is processed as a generic 
> RetriableException in RecordCollectorImpl.sendError. This results in an 
> irrelevant error message.
> We were supposed to see this
> "Timeout exception caught when sending record to topic %s. " +
>  "This might happen if the producer cannot send data to the Kafka cluster and 
> thus, " +
>  "its internal buffer fills up. " +
>  "This can also happen if the broker is slow to respond, if the network 
> connection to " +
>  "the broker was interrupted, or if similar circumstances arise. " +
>  "You can increase producer parameter `max.block.ms` to increase this 
> timeout."
> but got this:
> "You can increase the producer configs `delivery.timeout.ms` and/or " +
>  "`retries` to avoid this error. Note that `retries` is set to infinite by 
> default."
> These params are not applicable to metadata updates.
> Technical details:
> (1) Lines 221 - 236 in 
> kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
> are dead code. They are never executed because {{producer.send}} never throws 
> TimeoutException, but returns a failed future. You can see it in lines 
> 948-955 in 
> kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> (2) The exception is then processed in a callback function in the method 
> {{recordSendError}} on line 202. The DefaultProductionExceptionHandler is 
> used.
> (3) in {{recordSendError}} in the same class the timeout exception is 
> processed as RetriableException at lines 133-136. The error message is simply 
> wrong because tweaking  {{[delivery.timeout.ms|http://delivery.timeout.ms/]}} 
> and {{retries}} has nothing to do with the issue in this case.
> Proposed solution:
> (1) Remove unreachable catch (final TimeoutException e) in 
> RecordCollectorImpl.java as Producer does not throw ApiExceptions.
> (2) Move the aforementioned catch clause to recordSendError method.
> (3) Process TimeoutException separately from RetiriableException.
> (4) Implement a unit test to cover this corner case
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9131) failed producer metadata updates result in the unrelated error message

2019-12-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989161#comment-16989161
 ] 

Guozhang Wang commented on KAFKA-9131:
--

[~gkomissarov] Thanks for reporting and fixing this!

> failed producer metadata updates result in the unrelated error message
> --
>
> Key: KAFKA-9131
> URL: https://issues.apache.org/jira/browse/KAFKA-9131
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Gleb Komissarov
>Assignee: Gleb Komissarov
>Priority: Major
> Fix For: 2.5.0
>
>
> {{Producer Metadata TimeoutException}} is processed as a generic 
> RetriableException in RecordCollectorImpl.sendError. This results in an 
> irrelevant error message.
> We were supposed to see this
> "Timeout exception caught when sending record to topic %s. " +
>  "This might happen if the producer cannot send data to the Kafka cluster and 
> thus, " +
>  "its internal buffer fills up. " +
>  "This can also happen if the broker is slow to respond, if the network 
> connection to " +
>  "the broker was interrupted, or if similar circumstances arise. " +
>  "You can increase producer parameter `max.block.ms` to increase this 
> timeout."
> but got this:
> "You can increase the producer configs `delivery.timeout.ms` and/or " +
>  "`retries` to avoid this error. Note that `retries` is set to infinite by 
> default."
> These params are not applicable to metadata updates.
> Technical details:
> (1) Lines 221 - 236 in 
> kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
> are dead code. They are never executed because {{producer.send}} never throws 
> TimeoutException, but returns a failed future. You can see it in lines 
> 948-955 in 
> kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> (2) The exception is then processed in a callback function in the method 
> {{recordSendError}} on line 202. The DefaultProductionExceptionHandler is 
> used.
> (3) in {{recordSendError}} in the same class the timeout exception is 
> processed as RetriableException at lines 133-136. The error message is simply 
> wrong because tweaking  {{[delivery.timeout.ms|http://delivery.timeout.ms/]}} 
> and {{retries}} has nothing to do with the issue in this case.
> Proposed solution:
> (1) Remove unreachable catch (final TimeoutException e) in 
> RecordCollectorImpl.java as Producer does not throw ApiExceptions.
> (2) Move the aforementioned catch clause to recordSendError method.
> (3) Process TimeoutException separately from RetiriableException.
> (4) Implement a unit test to cover this corner case
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams

2019-12-05 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9274:
--

 Summary: Gracefully handle timeout exceptions on Kafka Streams
 Key: KAFKA-9274
 URL: https://issues.apache.org/jira/browse/KAFKA-9274
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Boyang Chen


Right now streams don't treat timeout exception as retriable in general by 
throwing it to the application level. If not handled by the user, this would 
kill the stream thread unfortunately.

In fact, timeouts happen mostly due to network issue or server side 
unavailability. Hard failure on client seems to be an over-kill.

We would like to discuss what's the best practice to handle timeout exceptions 
on Streams. The current state is still brainstorming and consolidate all the 
cases that contain timeout exception within this ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64

2019-12-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989187#comment-16989187
 ] 

Guozhang Wang commented on KAFKA-9225:
--

[~vvcephei] Can we just require users to make code changes in their rocksDB 
options APIs in 3.0 while doing the upgrade in place?

> kafka fail to run on linux-aarch64
> --
>
> Key: KAFKA-9225
> URL: https://issues.apache.org/jira/browse/KAFKA-9225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: jiamei xie
>Priority: Blocker
>  Labels: incompatible
> Fix For: 3.0.0
>
> Attachments: compat_report.html
>
>
> *Steps to reproduce:*
> 1. Download Kafka latest source code
> 2. Build it with gradle
> 3. Run 
> [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]]
> when running bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with 
> the following error message
> {code:java}
> xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied 
> but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:24,278] ERROR stream-client 
> [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads 
> have died. The instance will be in error state and should be closed. 
> (org.apach e.kafka.streams.KafkaStreams)
> Exception in thread 
> "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: 
> /tmp/librocksdbjni13777546368576524 84.so: 
> cannot open shared object file: No such file or directory (Possible cause: 
> can't load AMD 64-bit .so on a AARCH64-bit platform)
> {code}
>  *Analyze:*
> This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 
> native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from 
> [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix 
> this problem.
> Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 
> and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to 
> 6.3.6 in upstream? Should there be any kind of tests to execute, please 
> kindly point me. Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams

2019-12-05 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989190#comment-16989190
 ] 

Bill Bejeck commented on KAFKA-9274:


At the risk of stating the obvious,I think we should have a default number of 
`num.timeout.retries`.  The `num.timeout.retries` would be a new configuration 
(KIP required) that gets applied across the board in a streams application for 
all cases where there could be a timeout connecting to the broker.

> Gracefully handle timeout exceptions on Kafka Streams
> -
>
> Key: KAFKA-9274
> URL: https://issues.apache.org/jira/browse/KAFKA-9274
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> Right now streams don't treat timeout exception as retriable in general by 
> throwing it to the application level. If not handled by the user, this would 
> kill the stream thread unfortunately.
> In fact, timeouts happen mostly due to network issue or server side 
> unavailability. Hard failure on client seems to be an over-kill.
> We would like to discuss what's the best practice to handle timeout 
> exceptions on Streams. The current state is still brainstorming and 
> consolidate all the cases that contain timeout exception within this ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6989) Support Async Processing in Streams

2019-12-05 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6989:
-
Labels: api needs-kip  (was: needs-kip)

> Support Async Processing in Streams
> ---
>
> Key: KAFKA-6989
> URL: https://issues.apache.org/jira/browse/KAFKA-6989
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: api, needs-kip
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-311%3A+Async+processing+with+dynamic+scheduling+in+Kafka+Streams]
> Today Kafka Streams use a single-thread per task architecture to achieve 
> embarrassing parallelism and good isolation. However there are a couple 
> scenarios where async processing may be preferable:
> 1) External resource access or heavy IOs with high-latency. Suppose you need 
> to access a remote REST api, read / write to an external store, or do a heavy 
> disk IO operation that may result in high latency. Current threading model 
> would block any other records before this record's done, waiting on the 
> remote call / IO to finish.
> 2) Robust failure handling with retries. Imagine the app-level processing of 
> a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an 
> external system, and this call failed), and failed records are moved into a 
> separate "retry" topic. How can you process such failed records in a scalable 
> way? For example, imagine you need to implement a retry policy such as "retry 
> with exponential backoff". Here, you have the problem that 1. you can't 
> really pause processing a single record because this will pause the 
> processing of the full stream (bottleneck!) and 2. there is no 
> straight-forward way to "sort" failed records based on their "next retry 
> time" (think: priority queue).
> 3) Delayed processing. One use case is delaying re-processing (e.g. "delay 
> re-processing this event for 5 minutes") as mentioned in 2), another is for 
> implementing a scheduler: e.g. do some additional operations later based on 
> this processed record. based on Zalando Dublin, for example, are implementing 
> a distributed web crawler. Note that although this feature can be handled in 
> punctuation, it is not well aligned with our current offset committing 
> behavior, which always advance the offset once the record has been done 
> traversing the topology.
> I'm thinking of two options to support this feature:
> 1. Make the commit() mechanism more customizable to users for them to 
> implement multi-threading processing themselves: users can always do async 
> processing in the Processor API by spawning a thread-poll, e.g. but the key 
> is that the offset to be committed should be only advanced with such async 
> processing is done. This is a light-weight approach: we provide all the 
> pieces and tools, and users stack them up to build their own LEGOs.
> 2. Provide an general API to do async processing in Processor API, and take 
> care of the offsets committing internally. This is a heavy-weight approach: 
> the API may not cover all async scenarios, but it is a easy way to cover the 
> rest majority scenarios, and users do not need to worry of internal 
> implementation details such as offsets and fault tolerance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9073) Kafka Streams State stuck in rebalancing after one of the StreamThread encounters java.lang.IllegalStateException: No current assignment for partition

2019-12-05 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-9073:
-
Fix Version/s: 2.3.2

> Kafka Streams State stuck in rebalancing after one of the StreamThread 
> encounters java.lang.IllegalStateException: No current assignment for 
> partition
> --
>
> Key: KAFKA-9073
> URL: https://issues.apache.org/jira/browse/KAFKA-9073
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: amuthan Ganeshan
>Priority: Major
> Fix For: 2.4.0, 2.3.2
>
> Attachments: KAFKA-9073.log
>
>
> I have a Kafka stream application that stores the incoming messages into a 
> state store, and later during the punctuation period, we store them into a 
> big data persistent store after processing the messages.
> The application consumes from 120 partitions distributed across 40 instances. 
> The application has been running fine without any problem for months, but all 
> of a sudden some of the instances failed because of a stream thread exception 
> saying  
> ```java.lang.IllegalStateException: No current assignment for partition 
> --changelog-98```
>  
> And other instances are stuck in the REBALANCING state, and never comes out 
> of it. Here is the full stack trace, I just masked the application-specific 
> app name and store name in the stack trace due to NDA.
>  
> ```
> 2019-10-21 13:27:13,481 ERROR 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> [org.apache.kafka.streams.processor.internals.StreamThread] [] stream-thread 
> [application.id-a2c06c51-bfc7-449a-a094-d8b770caee92-StreamThread-3] 
> Encountered the following error during processing:
> java.lang.IllegalStateException: No current assignment for partition 
> application.id-store_name-changelog-98
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:319)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.requestFailed(SubscriptionState.java:618)
>  at 
> org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:709)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:574)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1281)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> ```
>  
> Now I checked the state sore disk usage; it is less than 40% of the total 
> disk space available. Restarting the application solves the problem for a 
> short amount of time, but the error popping up randomly on some other 
> instances quickly. I tried to change the retry and retry.backoff.ms 
> configuration but not helpful at all
> ```
> retries = 2147483647
> retry.backoff.ms
> ```
> After googling for some time I found there was a similar bug reported to the 
> Kafka team in the past, and also notice my stack trace is exactly matching 
> with the stack trace of the reported bug.
> Here is the link for the bug reported on a comparable basis a year ago.
> https://issues.apache.org/jira/browse/KAFKA-7181
>  
> Now I am wondering

[jira] [Commented] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams

2019-12-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989195#comment-16989195
 ] 

John Roesler commented on KAFKA-9274:
-

Thanks for starting this, [~bchen225242]!

One thing we should document is that the current approach is actually 
intentional. Streams essentially delegates the handling of transient failures 
into the clients (Consumer and Producer and Admin). Accordingly, the current 
advice is to set the client timeouts high enough to tolerate transient outages.

However, I agree with you that this is not the best approach. Setting the 
timeout high enough to be resilient to most network outages has the perverse 
effect of allowing StreamThreads to get hung up on an individual operation for 
that amount of time, harming the overall ability of the system to make progress 
on all inputs. For example, what if only one broker is unhealthy/unavailable? 
Rather than sit around indefinitely waiting to commit on the partition that 
broker is leader for, we could attempt to make progress on other tasks, and 
then try to commit the stuck one again later.

Another issue is self-healing. Suppose that the broker cluster becomes 
unavailable. If we're running an application in which some or all of the 
threads will die from timeouts, then whenever the broker's operators _do_ bring 
it back up, we would have to actually restart the application to recover. Maybe 
this doesn't seem so bad, but it you're in a large organization operating 100 
Streams apps, it sounds like a pretty big pain to me.

Conversely, if Streams were to just log a warning each time it got a timeout, 
but continue trying to make progress, then we would know that there is 
something wrong (because we're monitoring the app for warnings), so we could 
alert the broker's operators. However, once the issue is resolved, our app will 
just automatically pick right back up where it left off.

At a very high level, I'd propose the following failure handling protocol:
1. *retriable error*: A transient failure (like a timeout exception). We just 
put the task to the side, and try to do work for the next task.
2. *recoverable error*: A permanent failure (like getting fenced), than we can 
recover from by re-joining the cluster. We try to close and re-open the 
producer, or initiate a rebalance to re-join the cluster, depending on the 
exact nature of the failure.
3. *fatal error*: A permanent failure that requires human intervention to 
resolve. Something like discovering that we're in the middle of upgrading to an 
incompatible topology, or that the application itself is invalid for some other 
reason. Attempting to continue could result in data corruption, so we should 
just shut down the whole application so that someone can figure out what's 
wrong and fix it.

> Gracefully handle timeout exceptions on Kafka Streams
> -
>
> Key: KAFKA-9274
> URL: https://issues.apache.org/jira/browse/KAFKA-9274
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Major
>
> Right now streams don't treat timeout exception as retriable in general by 
> throwing it to the application level. If not handled by the user, this would 
> kill the stream thread unfortunately.
> In fact, timeouts happen mostly due to network issue or server side 
> unavailability. Hard failure on client seems to be an over-kill.
> We would like to discuss what's the best practice to handle timeout 
> exceptions on Streams. The current state is still brainstorming and 
> consolidate all the cases that contain timeout exception within this ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams

2019-12-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989195#comment-16989195
 ] 

John Roesler edited comment on KAFKA-9274 at 12/5/19 9:47 PM:
--

Thanks for starting this, [~bchen225242]!

One thing we should document is that the current approach is actually 
intentional. Streams essentially delegates the handling of transient failures 
into the clients (Consumer and Producer and Admin). Accordingly, the current 
advice is to set the client timeouts high enough to tolerate transient outages.

However, I agree with you that this is not the best approach. Setting the 
timeout high enough to be resilient to most network outages has the perverse 
effect of allowing StreamThreads to get hung up on an individual operation for 
that amount of time, harming the overall ability of the system to make progress 
on all inputs. For example, what if only one broker is unhealthy/unavailable? 
Rather than sit around indefinitely waiting to commit on the partition that 
broker is leader for, we could attempt to make progress on other tasks, and 
then try to commit the stuck one again later.

Another issue is self-healing. Suppose that the broker cluster becomes 
unavailable. If we're running an application in which some or all of the 
threads will die from timeouts, then whenever the broker's operators _do_ bring 
it back up, we would have to actually restart the application to recover. Maybe 
this doesn't seem so bad, but it you're in a large organization operating 100 
Streams apps, it sounds like a pretty big pain to me.

Conversely, if Streams were to just log a warning each time it got a timeout, 
but continue trying to make progress, then we would know that there is 
something wrong (because we're monitoring the app for warnings), so we could 
alert the broker's operators. However, once the issue is resolved, our app will 
just automatically pick right back up where it left off.

At a very high level, I'd propose the following failure handling protocol:
1. *retriable error*: A transient failure (like a timeout exception). We just 
put the task to the side, and try to do work for the next task. When we come 
back around to the current task, we should try to repeat the operation. I.e., 
if it was a timeout on commit, we should try to commit again when we come back 
around, as opposed to just continuing to process that task without committing.
2. *recoverable error*: A permanent failure (like getting fenced), than we can 
recover from by re-joining the cluster. We try to close and re-open the 
producer, or initiate a rebalance to re-join the cluster, depending on the 
exact nature of the failure.
3. *fatal error*: A permanent failure that requires human intervention to 
resolve. Something like discovering that we're in the middle of upgrading to an 
incompatible topology, or that the application itself is invalid for some other 
reason. Attempting to continue could result in data corruption, so we should 
just shut down the whole application so that someone can figure out what's 
wrong and fix it.


was (Author: vvcephei):
Thanks for starting this, [~bchen225242]!

One thing we should document is that the current approach is actually 
intentional. Streams essentially delegates the handling of transient failures 
into the clients (Consumer and Producer and Admin). Accordingly, the current 
advice is to set the client timeouts high enough to tolerate transient outages.

However, I agree with you that this is not the best approach. Setting the 
timeout high enough to be resilient to most network outages has the perverse 
effect of allowing StreamThreads to get hung up on an individual operation for 
that amount of time, harming the overall ability of the system to make progress 
on all inputs. For example, what if only one broker is unhealthy/unavailable? 
Rather than sit around indefinitely waiting to commit on the partition that 
broker is leader for, we could attempt to make progress on other tasks, and 
then try to commit the stuck one again later.

Another issue is self-healing. Suppose that the broker cluster becomes 
unavailable. If we're running an application in which some or all of the 
threads will die from timeouts, then whenever the broker's operators _do_ bring 
it back up, we would have to actually restart the application to recover. Maybe 
this doesn't seem so bad, but it you're in a large organization operating 100 
Streams apps, it sounds like a pretty big pain to me.

Conversely, if Streams were to just log a warning each time it got a timeout, 
but continue trying to make progress, then we would know that there is 
something wrong (because we're monitoring the app for warnings), so we could 
alert the broker's operators. However, once the issue is resolved, our app will 
just automatically pick right back up where it left off.

At a very high level, 

[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64

2019-12-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989201#comment-16989201
 ] 

John Roesler commented on KAFKA-9225:
-

Hey [~guozhang],

Yes, I think it would be ok to plan to upgrade to RocksDB 3.x as part of the AK 
3.0 release. In 2.4, we are adding a log message warning anyone who uses the 
RocksDB APIs in question that this change will happen in the next major 
release. (We're not in a position to add deprecation annotations, since it's 
directly exposed RocksDB classes.

Since we don't know when 3.0 would be, exactly, I was proposing a way that we 
could support both versions of RocksDB as soon as the very next release. But 
it's a cost/benefit question as to whether we should do it.

> kafka fail to run on linux-aarch64
> --
>
> Key: KAFKA-9225
> URL: https://issues.apache.org/jira/browse/KAFKA-9225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: jiamei xie
>Priority: Blocker
>  Labels: incompatible
> Fix For: 3.0.0
>
> Attachments: compat_report.html
>
>
> *Steps to reproduce:*
> 1. Download Kafka latest source code
> 2. Build it with gradle
> 3. Run 
> [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]]
> when running bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with 
> the following error message
> {code:java}
> xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied 
> but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:24,278] ERROR stream-client 
> [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads 
> have died. The instance will be in error state and should be closed. 
> (org.apach e.kafka.streams.KafkaStreams)
> Exception in thread 
> "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: 
> /tmp/librocksdbjni13777546368576524 84.so: 
> cannot open shared object file: No such file or directory (Possible cause: 
> can't load AMD 64-bit .so on a AARCH64-bit platform)
> {code}
>  *Analyze:*
> This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 
> native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from 
> [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix 
> this problem.
> Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 
> and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to 
> 6.3.6 in upstream? Should there 

[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64

2019-12-05 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989206#comment-16989206
 ] 

Sophie Blee-Goldman commented on KAFKA-9225:


At that point we might as well just [fork RocksDB 
|https://issues.apache.org/jira/browse/KAFKA-9148] ;) 

> kafka fail to run on linux-aarch64
> --
>
> Key: KAFKA-9225
> URL: https://issues.apache.org/jira/browse/KAFKA-9225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: jiamei xie
>Priority: Blocker
>  Labels: incompatible
> Fix For: 3.0.0
>
> Attachments: compat_report.html
>
>
> *Steps to reproduce:*
> 1. Download Kafka latest source code
> 2. Build it with gradle
> 3. Run 
> [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]]
> when running bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with 
> the following error message
> {code:java}
> xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied 
> but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:24,278] ERROR stream-client 
> [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads 
> have died. The instance will be in error state and should be closed. 
> (org.apach e.kafka.streams.KafkaStreams)
> Exception in thread 
> "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: 
> /tmp/librocksdbjni13777546368576524 84.so: 
> cannot open shared object file: No such file or directory (Possible cause: 
> can't load AMD 64-bit .so on a AARCH64-bit platform)
> {code}
>  *Analyze:*
> This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 
> native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from 
> [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix 
> this problem.
> Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 
> and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to 
> 6.3.6 in upstream? Should there be any kind of tests to execute, please 
> kindly point me. Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication

2019-12-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989207#comment-16989207
 ] 

Guozhang Wang commented on KAFKA-9013:
--

Happens again:

https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/3756/consoleFull

Different error message:
{code}
12:47:24 org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
testReplication FAILED
12:47:24 java.lang.RuntimeException: Could not find enough records. found 
0, expected 1
12:47:24 at 
org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:334)
12:47:24 at 
org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:234)
{code}

> Flaky Test MirrorConnectorsIntegrationTest#testReplication
> --
>
> Key: KAFKA-9013
> URL: https://issues.apache.org/jira/browse/KAFKA-9013
> Project: Kafka
>  Issue Type: Bug
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {code:java}
> java.lang.AssertionError: Condition not met within timeout 2. Offsets not 
> translated downstream to primary cluster.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354)
>   at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239)
> {code}
> h1. Standard Error
> {code}
> Standard Error
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime

[jira] [Commented] (KAFKA-9260) Improve Serde "push down" and "wrapping"

2019-12-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989216#comment-16989216
 ] 

Guozhang Wang commented on KAFKA-9260:
--

One cause of the current design is that we used to build the topology first, 
then pass in the config which contains the default serdes, hence it is done in 
two consecutive step.

However now with optimization we already have a 
StreamsBuilder.build(Properties) override, and in practice it would not be too 
aggressive to "always" require the configs be ready when building the topology 
anyways. So on top of my head I think:

1) removing the other `StreamsBuilder.build()` without params to always enforce 
configs being passed in.
2) removing the `KafkaStreams` construct that takes topology and config, but 
only the topology since it should contain the configs already.

Then in the topology building phase, we can have a uniform framework as "if 
there's a serde inheritable, use it; otherwise use the default serde from 
config directly and apply any wrapping logic if necessary". WDYT?

> Improve Serde "push down" and "wrapping"
> 
>
> Key: KAFKA-9260
> URL: https://issues.apache.org/jira/browse/KAFKA-9260
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Kafka Streams DSL supports "serde push down" feature to let downstream 
> operators inherit upstream serdes if key and/or value are not modified. 
> Furthermore, some operators "wrap" user specified serdes internally (eg, 
> windowed aggregations wrap the user specified key-serde with a time/session 
> window serde – some stores use `ValueAndTimestampSerde` and foreign-key joins 
> also uses some internal wrappers).
> The current implementation faces couple of issues, because the "serde push 
> down" feature is a DSL level feature that is used when the Topology is 
> generated. Furthermore, "serde wrapping" is an operator specific feature, not 
> a DSL concept per-se. At runtime, neither "push down" nor "wrapping" are know 
> concepts.
> This design implies that if users specify serdes, wrapping and push down 
> works as expected. However, if we fall back to default serdes, special care 
> needs to be taken: for example, some operators not apply the wrapping logic 
> during translation time, and there is additional code that does the wrapping 
> of default serdes as runtime. Another approach would be to wrap a null-Serde, 
> and update the wrapper later (ie, overwrite `null` with the default serde 
> from the config).
> Overall, the current design leads to bugs (eg, KAFKA-9248 and KAFKA-9259), 
> and user confusion how it actually works and when/where to specify serdes. 
> Hence, we should consider to rework how we do serde push down and/or wrapping.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64

2019-12-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989225#comment-16989225
 ] 

John Roesler commented on KAFKA-9225:
-

I see what you did there :)

Forking RocksDB is also a viable option to deal with problems like the above, 
but in seriousness, I don't see offering different "plug in" modules with 
different state store implementations as "just about the same loe" as forking 
Rocks. They may actually both be useful approaches, though. 

> kafka fail to run on linux-aarch64
> --
>
> Key: KAFKA-9225
> URL: https://issues.apache.org/jira/browse/KAFKA-9225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: jiamei xie
>Priority: Blocker
>  Labels: incompatible
> Fix For: 3.0.0
>
> Attachments: compat_report.html
>
>
> *Steps to reproduce:*
> 1. Download Kafka latest source code
> 2. Build it with gradle
> 3. Run 
> [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]]
> when running bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with 
> the following error message
> {code:java}
> xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied 
> but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:24,278] ERROR stream-client 
> [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads 
> have died. The instance will be in error state and should be closed. 
> (org.apach e.kafka.streams.KafkaStreams)
> Exception in thread 
> "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: 
> /tmp/librocksdbjni13777546368576524 84.so: 
> cannot open shared object file: No such file or directory (Possible cause: 
> can't load AMD 64-bit .so on a AARCH64-bit platform)
> {code}
>  *Analyze:*
> This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 
> native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from 
> [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix 
> this problem.
> Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 
> and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to 
> 6.3.6 in upstream? Should there be any kind of tests to execute, please 
> kindly point me. Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9148) Consider forking RocksDB for Streams

2019-12-05 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9148:
---
Description: 
We recently upgraded our RocksDB dependency to 5.18 for its memory-management 
abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, 
someone from Flink recently discovered a ~8% [performance 
regression|https://github.com/facebook/rocksdb/issues/5774] that exists in all 
versions 5.18+ (up through the current newest version, 6.2.2). Flink was able 
to react to this by downgrading to 5.17 and [picking the 
WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their 
fork (fRocksDB).

Due to this and other reasons enumerated below, we should consider also forking 
our own RocksDB for Streams.

Pros:
 * We can avoid passing sudden breaking changes on to our users, such removal 
of methods with no deprecation period (see discussion on KAFKA-8897)
 * We can pick whichever version has the best performance for our needs, and 
pick over any new features, metrics, etc that we need to use rather than being 
forced to upgrade (and breaking user code, introducing regression, etc)
 * Support for some architectures does not exist in all versions, making 
RocksDB – Streams RocksDB versions are  architecutres
 * The Java API seems to be a very low priority to the rocksdb folks.
 ** They leave out critical functionality, features, and configuration options 
that have been in the c++ API for a very long time
 ** Those that do make it over often have random gaps in the API such as 
setters but no getters (see [rocksdb PR 
#5186|https://github.com/facebook/rocksdb/pull/5186])
 ** Others are poorly designed and require too many trips across the JNI, 
making otherwise incredibly useful features prohibitively expensive.
 *** [Custom comparator|#issuecomment-83145980]]: a custom comparator could 
significantly improve the performance of session windows. This is trivial to do 
but given the high performance cost of crossing the jni, it is currently only 
practical to use a c++ comparator
 *** [Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not 
currently used by Streams but a commonly requested feature, and may also allow 
improved range queries
 ** Even when an external contributor develops a solution for poorly performing 
Java functionality and helpfully tries to contribute their patch back to 
rocksdb, it gets ignored by the rocksdb people ([rocksdb PR 
#2283|https://github.com/facebook/rocksdb/pull/2283])

Cons:
 * More work (not to be trivialized, the truth is we don't and can't know how 
much extra work this will ultimately be)

Given that we rarely upgrade the Rocks dependency, use only some fraction of 
its features, and would need or want to make only minimal changes ourselves, it 
seems like we could actually get away with very little extra work by forking 
rocksdb. Note that as of this writing the frocksdb repo has only needed to open 
5 PRs on top of the actual rocksdb (two of them trivial). Of course, the LOE to 
maintain this will only grow over time, so we should think carefully about 
whether and when to start taking on this potential burden.

 

  was:
We recently upgraded our RocksDB dependency to 5.18 for its memory-management 
abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, 
someone from Flink recently discovered a ~8% [performance 
regression|https://github.com/facebook/rocksdb/issues/5774] that exists in all 
versions 5.18+ (up through the current newest version, 6.2.2). Flink was able 
to react to this by downgrading to 5.17 and [picking the 
WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their 
fork (fRocksDB).

Due to this and other reasons enumerated below, we should consider also forking 
our own RocksDB for Streams.

Pros:
 * We can avoid passing sudden breaking changes on to our users, such removal 
of methods with no deprecation period (see discussion on KAFKA-8897)
 * We can pick whichever version has the best performance for our needs, and 
pick over any new features, metrics, etc that we need to use rather than being 
forced to upgrade (and breaking user code, introducing regression, etc)
 * The Java API seems to be a very low priority to the rocksdb folks.
 ** They leave out critical functionality, features, and configuration options 
that have been in the c++ API for a very long time
 ** Those that do make it over often have random gaps in the API such as 
setters but no getters (see [rocksdb PR 
#5186|https://github.com/facebook/rocksdb/pull/5186])
 ** Others are poorly designed and require too many trips across the JNI, 
making otherwise incredibly useful features prohibitively expensive.
 *** [Custom comparator|#issuecomment-83145980]]: a custom comparator could 
significantly improve the performance of session windows. This is trivial to do 
but given the high perform

[jira] [Updated] (KAFKA-9148) Consider forking RocksDB for Streams

2019-12-05 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9148:
---
Description: 
We recently upgraded our RocksDB dependency to 5.18 for its memory-management 
abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, 
someone from Flink recently discovered a ~8% [performance 
regression|https://github.com/facebook/rocksdb/issues/5774] that exists in all 
versions 5.18+ (up through the current newest version, 6.2.2). Flink was able 
to react to this by downgrading to 5.17 and [picking the 
WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their 
fork (fRocksDB).

Due to this and other reasons enumerated below, we should consider also forking 
our own RocksDB for Streams.

Pros:
 * We can avoid passing sudden breaking changes on to our users, such removal 
of methods with no deprecation period (see discussion on KAFKA-8897)
 * We can pick whichever version has the best performance for our needs, and 
pick over any new features, metrics, etc that we need to use rather than being 
forced to upgrade (and breaking user code, introducing regression, etc)
 * Support for some architectures does not exist in all RocksDB versions, 
making Streams completely unusable for some users until we can upgrade the 
rocksdb dependency to one that supports their specific case
 * The Java API seems to be a very low priority to the rocksdb folks.
 ** They leave out critical functionality, features, and configuration options 
that have been in the c++ API for a very long time
 ** Those that do make it over often have random gaps in the API such as 
setters but no getters (see [rocksdb PR 
#5186|https://github.com/facebook/rocksdb/pull/5186])
 ** Others are poorly designed and require too many trips across the JNI, 
making otherwise incredibly useful features prohibitively expensive.
 *** [Custom comparator|#issuecomment-83145980]]: a custom comparator could 
significantly improve the performance of session windows. This is trivial to do 
but given the high performance cost of crossing the jni, it is currently only 
practical to use a c++ comparator
 *** [Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not 
currently used by Streams but a commonly requested feature, and may also allow 
improved range queries
 ** Even when an external contributor develops a solution for poorly performing 
Java functionality and helpfully tries to contribute their patch back to 
rocksdb, it gets ignored by the rocksdb people ([rocksdb PR 
#2283|https://github.com/facebook/rocksdb/pull/2283])

Cons:
 * More work (not to be trivialized, the truth is we don't and can't know how 
much extra work this will ultimately be)

Given that we rarely upgrade the Rocks dependency, use only some fraction of 
its features, and would need or want to make only minimal changes ourselves, it 
seems like we could actually get away with very little extra work by forking 
rocksdb. Note that as of this writing the frocksdb repo has only needed to open 
5 PRs on top of the actual rocksdb (two of them trivial). Of course, the LOE to 
maintain this will only grow over time, so we should think carefully about 
whether and when to start taking on this potential burden.

 

  was:
We recently upgraded our RocksDB dependency to 5.18 for its memory-management 
abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, 
someone from Flink recently discovered a ~8% [performance 
regression|https://github.com/facebook/rocksdb/issues/5774] that exists in all 
versions 5.18+ (up through the current newest version, 6.2.2). Flink was able 
to react to this by downgrading to 5.17 and [picking the 
WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their 
fork (fRocksDB).

Due to this and other reasons enumerated below, we should consider also forking 
our own RocksDB for Streams.

Pros:
 * We can avoid passing sudden breaking changes on to our users, such removal 
of methods with no deprecation period (see discussion on KAFKA-8897)
 * We can pick whichever version has the best performance for our needs, and 
pick over any new features, metrics, etc that we need to use rather than being 
forced to upgrade (and breaking user code, introducing regression, etc)
 * Support for some architectures does not exist in all versions, making 
RocksDB – Streams RocksDB versions are  architecutres
 * The Java API seems to be a very low priority to the rocksdb folks.
 ** They leave out critical functionality, features, and configuration options 
that have been in the c++ API for a very long time
 ** Those that do make it over often have random gaps in the API such as 
setters but no getters (see [rocksdb PR 
#5186|https://github.com/facebook/rocksdb/pull/5186])
 ** Others are poorly designed and require too many trips across the JNI, 
making otherwise incredibly useful feature

[jira] [Created] (KAFKA-9275) Print assignment and IP address in the log message when consumer leaves/removed from the group

2019-12-05 Thread Badai Aqrandista (Jira)
Badai Aqrandista created KAFKA-9275:
---

 Summary: Print assignment and IP address in the log message when 
consumer leaves/removed from the group
 Key: KAFKA-9275
 URL: https://issues.apache.org/jira/browse/KAFKA-9275
 Project: Kafka
  Issue Type: Improvement
Reporter: Badai Aqrandista


In order to simplify identification of which member is causing rebalance, can 
we add the IP address and the list of topic-partitions assigned to the member 
that leaves/removed from the group?

And especially in the "reason" string for rebalance: 

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493


This will allow a much faster investigation when a consumer group is stuck in 
rebalancing state.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9276) Unclear warning due to empty throttled fetch response

2019-12-05 Thread Bob Barrett (Jira)
Bob Barrett created KAFKA-9276:
--

 Summary: Unclear warning due to empty throttled fetch response
 Key: KAFKA-9276
 URL: https://issues.apache.org/jira/browse/KAFKA-9276
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.3.0
Reporter: Bob Barrett


With a version 1.0.0 consumer talking to a 2.3.0 broker, the following 
WARN-level statement is logged when a request is throttled:
{code:java}
Ignoring fetch response containing partitions [] since it does not match the 
requested partitions [topic-29, topic-25, topic-21, topic-17, topic-33]{code}
It appears that the 1.0.0 consumer expects fetch data for all partitions, but 
the throttled response is empty, causing an unclear warning message. This may 
be a regression from KIP-219, which changed the throttling behavior to return 
the empty fetch response. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9275) Print assignment and IP address in the log message when consumer leaves/removed from the group

2019-12-05 Thread Badai Aqrandista (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badai Aqrandista updated KAFKA-9275:

Description: 
In order to simplify identification of which member is causing rebalance, can 
we add the IP address and the list of topic-partitions assigned to the member 
that leaves/removed from the group?

And especially in the "reason" string for rebalance: 

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493


This will allow a much faster investigation when a consumer group is stuck in 
rebalancing state.

Even better if we can send this reason to all other members that are affected 
by the rebalance. Currently, it takes forever to know what caused a rebalance 
when a consumer group continuously goes into rebalancing state due to one 
member keeps exceeding max.poll.intervals.ms.


  was:
In order to simplify identification of which member is causing rebalance, can 
we add the IP address and the list of topic-partitions assigned to the member 
that leaves/removed from the group?

And especially in the "reason" string for rebalance: 

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144

https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493


This will allow a much faster investigation when a consumer group is stuck in 
rebalancing state.



> Print assignment and IP address in the log message when consumer 
> leaves/removed from the group
> --
>
> Key: KAFKA-9275
> URL: https://issues.apache.org/jira/browse/KAFKA-9275
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Priority: Minor
>
> In order to simplify identification of which member is causing rebalance, can 
> we add the IP address and the list of topic-partitions assigned to the member 
> that leaves/removed from the group?
> And especially in the "reason" string for rebalance: 
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493
> This will allow a much faster investigation when a consumer group is stuck in 
> rebalancing state.
> Even better if we can send this reason to all other members that are affected 
> by the rebalance. Currently, it takes forever to know what caused a rebalance 
> when a consumer group continuously goes into rebalancing state due to one 
> member keeps exceeding max.poll.intervals.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9212) Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest

2019-12-05 Thread Michael Jaschob (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989287#comment-16989287
 ] 

Michael Jaschob commented on KAFKA-9212:


Chiming in here, I believe we've experienced the same error. I've been able to 
reproduce the behavior quite simply, as follows:
 - 3-broker cluster (running Apache Kafka 2.3.1)
 - one partition with replica assignment (0, 1, 2)
 - booted fourth broker (id 3)
 - initiated partition reassignment from (0, 1, 2) to (0, 1, 2, 3) with a very 
low throttle (for testing)

As soon as the assignment begins, a 2.3.0 console consumer simply hangs when 
started. A 1.1.1 consumer does not have any issues. I see this in leader 
broker's request logs:
{code:java}
[2019-12-05 16:38:36,790] DEBUG Completed 
request:RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, clientId=consumer-1, 
correlationId=1529) -- 
{replica_id=-1,isolation_level=0,topics=[{topic=DataPlatform.CGSynthTests,partitions=[{partition=0,current_leader_epoch=0,timestamp=-1}]}]},response:{throttle_time_ms=0,responses=[{topic=DataPlatform.CGSynthTests,partition_responses=[{partition=0,error_code=74,timestamp=-1,offset=-1,leader_epoch=-1}]}]}
 from connection 
172.22.15.67:9092-172.22.23.98:46974-9;totalTime:0.27,requestQueueTime:0.044,localTime:0.185,remoteTime:0.0,throttleTime:0.036,responseQueueTime:0.022,sendTime:0.025,securityProtocol:PLAINTEXT,principal:User:data-pipeline-monitor,listener:PLAINTEXT
 (kafka.request.logger)
{code}
Note the producer fenced error code for list offsets, as in the original report.

Once the reassignment completes, the 2.3.1 console consumer starts working. 
I've also tried a different reassignment (0, 1, 2) -> (3, 1, 2) with the same 
results.

Where we stand right now is we can't initiate partition reassignments in our 
production cluster without paralyzing a Spark application (using 2.3.0 client 
libs under the hood). Downgrading the Kafka client libs there isn't possible 
since they are part of the Spark assembly.

Any pointers on what the issue might be here? Struggling to understand the bug 
because it seems like any partition reassignment breaks LIST_OFFSETS requests 
from 2.3 clients, but that just seems to be too severe a problem to have gone 
unnoticed for so long. Even ideas for a workaround would help here, since we 
don't see a path to do partition reassignments without causing a production 
incident right now.

> Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest
> --
>
> Key: KAFKA-9212
> URL: https://issues.apache.org/jira/browse/KAFKA-9212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager
>Affects Versions: 2.3.0
> Environment: Linux
>Reporter: Yannick
>Priority: Critical
>
> When running Kafka connect s3 sink connector ( confluent 5.3.0), after one 
> broker got restarted (leaderEpoch updated at this point), the connect worker 
> crashed with the following error : 
> [2019-11-19 16:20:30,097] ERROR [Worker clientId=connect-1, 
> groupId=connect-ls] Uncaught exception in herder work thread, exiting: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253)
>  org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by 
> times in 30003ms
>  
> After investigation, it seems it's because it got fenced when sending 
> ListOffsetRequest in loop and then got timed out , as follows :
> [2019-11-19 16:20:30,020] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Sending ListOffsetRequest (type=ListOffsetRequest, 
> replicaId=-1, partitionTimestamps={connect_ls_config-0={timestamp: -1, 
> maxNumOffsets: 1, currentLeaderEpoch: Optional[1]}}, 
> isolationLevel=READ_UNCOMMITTED) to broker kafka6.fra2.internal:9092 (id: 4 
> rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:905)
> [2019-11-19 16:20:30,044] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher:985)
>  
> The above happens multiple times until timeout.
>  
> According to the debugs, the consumer always get a leaderEpoch of 1 for this 
> topic when starting up :
>  
>  [2019-11-19 13:27:30,802] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Updating last seen epoch from null to 1 for partition 
> connect_ls_config-0 (org.apache.kafka.clients.Metadata:178)
>   
>   
>  But according to our brokers log, the leaderEpoch should be 2, as follows :
>   
>  [2019-11-18 14:19:28,988] INFO [Partition connect_ls_config-0 broker=4] 
> connect_ls_config-0 starts at Leader Epoch 2 from offset 22. Previous Leader 
> Epoch was: 1 (kafka.cluster.Partition)
>   
>   
>  This make impossible t

[jira] [Updated] (KAFKA-9211) kafka upgrade 2.3.0 cause produce speed decrease

2019-12-05 Thread li xiangyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

li xiangyuan updated KAFKA-9211:

Attachment: producer.node.latency.png

> kafka upgrade 2.3.0 cause produce speed decrease
> 
>
> Key: KAFKA-9211
> URL: https://issues.apache.org/jira/browse/KAFKA-9211
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, producer 
>Affects Versions: 2.3.0
>Reporter: li xiangyuan
>Priority: Critical
> Attachments: broker-jstack.txt, producer-jstack.txt, 
> producer.node.latency.png
>
>
> Recently we try upgrade kafka from 0.10.0.1 to 2.3.0.
> we have 15 clusters in production env, each one has 3~6 brokers.
> we know kafka upgrade should:
>       1.replcae code to 2.3.0.jar and restart  all brokers one by one
>       2.unset inter.broker.protocol.version=0.10.0.1 and restart all brokers 
> one by one
>       3.unset log.message.format.version=0.10.0.1 and restart all brokers one 
> by one
>  
> for now we have already done step 1 & 2 in 12 clusters.but when we try to 
> upgrade left clusters (already done step 1) in step 2, we found some topics 
> drop produce speed badly.
>      we have research this issue for long time, since we couldn't test it in 
> production environment  and we couldn't reproduce in test environment, we 
> couldn't find the root cause.
> now we only could describe the situation in detail as  i know, hope anyone 
> could help us.
>  
> 1.because bug KAFKA-8653, i add code below in KafkaApis.scala 
> handleJoinGroupRequest function:
> {code:java}
> if (rebalanceTimeoutMs <= 0) {
>  rebalanceTimeoutMs = joinGroupRequest.data.sessionTimeoutMs
> }{code}
> 2.one cluster upgrade failed has 6 8C16G brokers, about 200 topics with 2 
> replicas,every broker keep 3000+ partitions and 1500+ leader partition, but 
> most of them has very low produce message speed,about less than 
> 50messages/sec, only one topic with 300 partitions has more than 2500 
> message/sec with more than 20 consumer groups consume message from it.
> so this whole cluster  produce 4K messages/sec , 11m Bytes in /sec,240m Bytes 
> out /sec.and more than 90% traffic made by that topic has 2500messages/sec.
> when we unset 5 or 6 servers' inter.broker.protocol.version=0.10.0.1  and 
> restart, this topic produce message drop to about 200messages/sec,  i don't 
> know whether the way we use could tirgger any problem.
> 3.we use kafka wrapped by spring-kafka and set kafkatemplate's 
> autoFlush=true, so each producer.send execution will execute producer.flush 
> immediately too.i know flush method will decrease produce performance 
> dramaticlly, but  at least it seems nothing wrong before upgrade step 2. but 
> i doubt whether it's a problem now after upgrade.
> 4.I noticed when produce speed decrease, some consumer group has large 
> message lag still consume message without any consume speed change or 
> decrease, so I guess only producerequest speed will drop down,but 
> fetchrequest not. 
> 5.we haven't set any throttle configuration, and all producers' acks=1(so 
> it's not broker replica fetch slow), and when this problem triggered, both 
> sever & producers cpu usage down, and servers' ioutil keep less than 30% ,so 
> it shuldn't be a hardware problem.
> 6.this event triggered often(almost 100%) most brokers has done upgrade step 
> 2,then after a auto leader replica election executed, then we can observe  
> produce speed drop down,and we have to downgrade brokers(set 
> inter.broker.protocol.version=0.10.0.1)and restart brokers one by one,then it 
> could be normal. some cluster have to downgrade all brokers,but some cluster 
> could left 1 or 2 brokers without downgrade, i notice that the broker not 
> need downgrade is the controller.
> 7.I have print jstack for producer & servers. although I do this not the same 
> cluster, but we can notice that their thread seems really in idle stat.
> 8.both 0.10.0.1 & 2.3.0 kafka-client will trigger this problem too.
> 8.unless the largest one topic will drop produce speed certainly, other topic 
> will drop produce speed randomly. maybe topicA will drop speed in first 
> upgrade attempt but next not, and topicB not drop speed in first attemp but 
> dropped when do another attempt.
> 9.in fact, the largest cluster, has the same topic & group usage scenario 
> mentioned above, but the largest topic has 1w2 messages/sec,will upgrade fail 
> in step 1(just use 2.3.0.jar)
> any help would be grateful, thx, i'm very sad now...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9211) kafka upgrade 2.3.0 cause produce speed decrease

2019-12-05 Thread li xiangyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

li xiangyuan updated KAFKA-9211:

Attachment: nodelay.txt

> kafka upgrade 2.3.0 cause produce speed decrease
> 
>
> Key: KAFKA-9211
> URL: https://issues.apache.org/jira/browse/KAFKA-9211
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, producer 
>Affects Versions: 2.3.0
>Reporter: li xiangyuan
>Priority: Critical
> Attachments: ackdelay.txt, broker-jstack.txt, nodelay.txt, 
> producer-jstack.txt, producer.node.latency.png
>
>
> Recently we try upgrade kafka from 0.10.0.1 to 2.3.0.
> we have 15 clusters in production env, each one has 3~6 brokers.
> we know kafka upgrade should:
>       1.replcae code to 2.3.0.jar and restart  all brokers one by one
>       2.unset inter.broker.protocol.version=0.10.0.1 and restart all brokers 
> one by one
>       3.unset log.message.format.version=0.10.0.1 and restart all brokers one 
> by one
>  
> for now we have already done step 1 & 2 in 12 clusters.but when we try to 
> upgrade left clusters (already done step 1) in step 2, we found some topics 
> drop produce speed badly.
>      we have research this issue for long time, since we couldn't test it in 
> production environment  and we couldn't reproduce in test environment, we 
> couldn't find the root cause.
> now we only could describe the situation in detail as  i know, hope anyone 
> could help us.
>  
> 1.because bug KAFKA-8653, i add code below in KafkaApis.scala 
> handleJoinGroupRequest function:
> {code:java}
> if (rebalanceTimeoutMs <= 0) {
>  rebalanceTimeoutMs = joinGroupRequest.data.sessionTimeoutMs
> }{code}
> 2.one cluster upgrade failed has 6 8C16G brokers, about 200 topics with 2 
> replicas,every broker keep 3000+ partitions and 1500+ leader partition, but 
> most of them has very low produce message speed,about less than 
> 50messages/sec, only one topic with 300 partitions has more than 2500 
> message/sec with more than 20 consumer groups consume message from it.
> so this whole cluster  produce 4K messages/sec , 11m Bytes in /sec,240m Bytes 
> out /sec.and more than 90% traffic made by that topic has 2500messages/sec.
> when we unset 5 or 6 servers' inter.broker.protocol.version=0.10.0.1  and 
> restart, this topic produce message drop to about 200messages/sec,  i don't 
> know whether the way we use could tirgger any problem.
> 3.we use kafka wrapped by spring-kafka and set kafkatemplate's 
> autoFlush=true, so each producer.send execution will execute producer.flush 
> immediately too.i know flush method will decrease produce performance 
> dramaticlly, but  at least it seems nothing wrong before upgrade step 2. but 
> i doubt whether it's a problem now after upgrade.
> 4.I noticed when produce speed decrease, some consumer group has large 
> message lag still consume message without any consume speed change or 
> decrease, so I guess only producerequest speed will drop down,but 
> fetchrequest not. 
> 5.we haven't set any throttle configuration, and all producers' acks=1(so 
> it's not broker replica fetch slow), and when this problem triggered, both 
> sever & producers cpu usage down, and servers' ioutil keep less than 30% ,so 
> it shuldn't be a hardware problem.
> 6.this event triggered often(almost 100%) most brokers has done upgrade step 
> 2,then after a auto leader replica election executed, then we can observe  
> produce speed drop down,and we have to downgrade brokers(set 
> inter.broker.protocol.version=0.10.0.1)and restart brokers one by one,then it 
> could be normal. some cluster have to downgrade all brokers,but some cluster 
> could left 1 or 2 brokers without downgrade, i notice that the broker not 
> need downgrade is the controller.
> 7.I have print jstack for producer & servers. although I do this not the same 
> cluster, but we can notice that their thread seems really in idle stat.
> 8.both 0.10.0.1 & 2.3.0 kafka-client will trigger this problem too.
> 8.unless the largest one topic will drop produce speed certainly, other topic 
> will drop produce speed randomly. maybe topicA will drop speed in first 
> upgrade attempt but next not, and topicB not drop speed in first attemp but 
> dropped when do another attempt.
> 9.in fact, the largest cluster, has the same topic & group usage scenario 
> mentioned above, but the largest topic has 1w2 messages/sec,will upgrade fail 
> in step 1(just use 2.3.0.jar)
> any help would be grateful, thx, i'm very sad now...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9211) kafka upgrade 2.3.0 cause produce speed decrease

2019-12-05 Thread li xiangyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

li xiangyuan updated KAFKA-9211:

Attachment: ackdelay.txt

> kafka upgrade 2.3.0 cause produce speed decrease
> 
>
> Key: KAFKA-9211
> URL: https://issues.apache.org/jira/browse/KAFKA-9211
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, producer 
>Affects Versions: 2.3.0
>Reporter: li xiangyuan
>Priority: Critical
> Attachments: ackdelay.txt, broker-jstack.txt, nodelay.txt, 
> producer-jstack.txt, producer.node.latency.png
>
>
> Recently we try upgrade kafka from 0.10.0.1 to 2.3.0.
> we have 15 clusters in production env, each one has 3~6 brokers.
> we know kafka upgrade should:
>       1.replcae code to 2.3.0.jar and restart  all brokers one by one
>       2.unset inter.broker.protocol.version=0.10.0.1 and restart all brokers 
> one by one
>       3.unset log.message.format.version=0.10.0.1 and restart all brokers one 
> by one
>  
> for now we have already done step 1 & 2 in 12 clusters.but when we try to 
> upgrade left clusters (already done step 1) in step 2, we found some topics 
> drop produce speed badly.
>      we have research this issue for long time, since we couldn't test it in 
> production environment  and we couldn't reproduce in test environment, we 
> couldn't find the root cause.
> now we only could describe the situation in detail as  i know, hope anyone 
> could help us.
>  
> 1.because bug KAFKA-8653, i add code below in KafkaApis.scala 
> handleJoinGroupRequest function:
> {code:java}
> if (rebalanceTimeoutMs <= 0) {
>  rebalanceTimeoutMs = joinGroupRequest.data.sessionTimeoutMs
> }{code}
> 2.one cluster upgrade failed has 6 8C16G brokers, about 200 topics with 2 
> replicas,every broker keep 3000+ partitions and 1500+ leader partition, but 
> most of them has very low produce message speed,about less than 
> 50messages/sec, only one topic with 300 partitions has more than 2500 
> message/sec with more than 20 consumer groups consume message from it.
> so this whole cluster  produce 4K messages/sec , 11m Bytes in /sec,240m Bytes 
> out /sec.and more than 90% traffic made by that topic has 2500messages/sec.
> when we unset 5 or 6 servers' inter.broker.protocol.version=0.10.0.1  and 
> restart, this topic produce message drop to about 200messages/sec,  i don't 
> know whether the way we use could tirgger any problem.
> 3.we use kafka wrapped by spring-kafka and set kafkatemplate's 
> autoFlush=true, so each producer.send execution will execute producer.flush 
> immediately too.i know flush method will decrease produce performance 
> dramaticlly, but  at least it seems nothing wrong before upgrade step 2. but 
> i doubt whether it's a problem now after upgrade.
> 4.I noticed when produce speed decrease, some consumer group has large 
> message lag still consume message without any consume speed change or 
> decrease, so I guess only producerequest speed will drop down,but 
> fetchrequest not. 
> 5.we haven't set any throttle configuration, and all producers' acks=1(so 
> it's not broker replica fetch slow), and when this problem triggered, both 
> sever & producers cpu usage down, and servers' ioutil keep less than 30% ,so 
> it shuldn't be a hardware problem.
> 6.this event triggered often(almost 100%) most brokers has done upgrade step 
> 2,then after a auto leader replica election executed, then we can observe  
> produce speed drop down,and we have to downgrade brokers(set 
> inter.broker.protocol.version=0.10.0.1)and restart brokers one by one,then it 
> could be normal. some cluster have to downgrade all brokers,but some cluster 
> could left 1 or 2 brokers without downgrade, i notice that the broker not 
> need downgrade is the controller.
> 7.I have print jstack for producer & servers. although I do this not the same 
> cluster, but we can notice that their thread seems really in idle stat.
> 8.both 0.10.0.1 & 2.3.0 kafka-client will trigger this problem too.
> 8.unless the largest one topic will drop produce speed certainly, other topic 
> will drop produce speed randomly. maybe topicA will drop speed in first 
> upgrade attempt but next not, and topicB not drop speed in first attemp but 
> dropped when do another attempt.
> 9.in fact, the largest cluster, has the same topic & group usage scenario 
> mentioned above, but the largest topic has 1w2 messages/sec,will upgrade fail 
> in step 1(just use 2.3.0.jar)
> any help would be grateful, thx, i'm very sad now...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9211) kafka upgrade 2.3.0 cause produce speed decrease

2019-12-05 Thread li xiangyuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989369#comment-16989369
 ] 

li xiangyuan commented on KAFKA-9211:
-

After some test, I believe we have successfully reproduced this problem in test 
environment. below shows what I did:

1.I start 6 8C16G brokers(kafka 2.3.0 with ) with config:
{code:java}
KAFKA_HEAP_OPTS=-Xmx6g -Xms6g -XX:MetaspaceSize=196m -XX:+UseG1GC 
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 
-XX:MaxMetaspaceFreeRatio=80{code}
in server.properties:

 
{code:java}
#inter.broker.protocol.version=0.10.0.1
log.message.format.version=0.10.0.1
{code}
 

and then create 1 topic(called large300 below) with 300 partition, 220+ topics 
that has 40 partition(called a1,a2,and so on), all topics have 2 replicas so 
whole cluster has totally more than 1W8 partitions, each broker has 3000 
partition and half of them (1500) works as leader partition. then this 
cluster's broker & partition num are similar  to our production environment.

2.I run 30 2C4G clients(kafka-clients 0.10.0.1),each one do:
 * one thread pool has 100 thread,each one execute 
producerA.send("large300",[1024]random bytes) & producerA.flush() once per 
second, these threads need execute semaphore.acquire() (in my test code 
Semaphore semaphore = new Semaphore(44)) before producerA.send() and 
semaphore.release() after producerA.flush(). consider to network lag .etc, 
generally 30 clients will produce message to topic large300 6000/sec .
 * one thread execute code below:
{code:java}
while (true) {
   Thread.sleep(5);
   a++;
   int loop = a % 10 +1;
   for(int topicId =1;topicId<=loop;topicId++){
 String t = "a"+topicId;
 String k = String.valueOf(System.currentTimeMillis());
 producerB.send(t, data);
 producerB.flush();
   }
}{code}
generally 30 clients will produce meesage to topic a1(1000message/sec) ~ 
a10(100message/sec)

 * start 21 consumer-groups, each group start 30 threads consume message from 
topic "large300". 

after do all , each client will has about 1000 connections to all 6 brokers.

 then I shutdown broker id6 gracefully and restart ,after a auto Preferred 
Replica Election (or excute manually), we can observe random topic produce 
speed drop down. when this happened,producer jmx metrics show request latency 
to node 6 become very high(producer.node.latency.png in attachment).but server 
jmx(kafka.server.TotalTimeMs.99thPercentile,type=RequestMetrics,name=TotalTimeMs,request=Produce)
 show produce request handle time didn't increase.

finally I use tcpdump catch producer-server connection, I find something 
doubtful.you can see detail info in attachment:

nodelay.txt show before Preferred Replica Election triggered,producer's request 
send very fast.

ackdelay.txt show after Preferred Replica Election triggered, some producer's 
produce request will split to multi tcp package to send to broker, producer 
didn't send whole produce request in once but wait until get tcp ack pack , but 
broker somtimes send back ack package wait about 40ms, this status won't 
recover even after 12 hours.

and execute ss can know tcp rtt become very high:
{code:java}
ss -i -t -e -m | grep  -A 1 "24132"ss -i -t -e -m | grep  -A 1 "24132"ESTAB     
 0      34593  [:::172.30.5.144]:24132                 
[:::172.30.3.36]:XmlIpcRegSvc          timer:(on,208ms,0) uid:1080 
ino:15632227 sk:933b84b13180 <-> 
skmem:(r0,rb65536,t0,tb262144,f7936,w57600,o0,bl0,d0) ts sack cubic wscale:2,0 
rto:234 rtt:33.854/9.621 ato:40 mss:1412 rcvmss:536 advmss:1448 cwnd:10 
ssthresh:7 bytes_acked:64001581 bytes_received:1366855 segs_out:48417 
segs_in:48297 send 3.3Mbps lastsnd:26 lastrcv:150 lastack:26 pacing_rate 
6.7Mbps unacked:1 rcv_rtt:26361 rcv_space:34696{code}
so if a produce request split into 4 tcp packages,it will cost most 120ms in 
network. and the worst situation in my test, the tcp connection has windows 
only 51 bytes with wscale 2, it only allow producer  send one tcp package less 
than 200 bytes,a produce request splited to 18 tcp pack, and broker send back 
ack would delay 200ms, it will cost 3500ms to send the whole produce request!

I believe something happened after Preferred Replica Election(almost the update 
that combine all partitions leaderandisr requests to one request,in my 
situation it will ask broker 6 to change partitions status leader & stop fetch 
as slave for 1500 partitions) ,it trigger tcp Congestion Control,and will not 
recover forever unless I reinit the producer so it will build another health 
tcp connection.

I dont exactly know whether   it's a tcp Congestion Control,and why it couldn't 
recover, and whether it has any os options to avoid this?I have tried some 
linux os config (net.ipv4.tcp_low_latency=1,net.ipv4.tcp_adv_win_scale=10 
.etc),all of them no use.

hope any give me

[jira] [Updated] (KAFKA-9211) kafka upgrade 2.3.0 may cause tcp delay ack(Congestion Control)

2019-12-05 Thread li xiangyuan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

li xiangyuan updated KAFKA-9211:

Summary: kafka upgrade 2.3.0 may cause tcp delay ack(Congestion Control)  
(was: kafka upgrade 2.3.0 cause produce speed decrease)

> kafka upgrade 2.3.0 may cause tcp delay ack(Congestion Control)
> ---
>
> Key: KAFKA-9211
> URL: https://issues.apache.org/jira/browse/KAFKA-9211
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, producer 
>Affects Versions: 2.3.0
>Reporter: li xiangyuan
>Priority: Critical
> Attachments: ackdelay.txt, broker-jstack.txt, nodelay.txt, 
> producer-jstack.txt, producer.node.latency.png
>
>
> Recently we try upgrade kafka from 0.10.0.1 to 2.3.0.
> we have 15 clusters in production env, each one has 3~6 brokers.
> we know kafka upgrade should:
>       1.replcae code to 2.3.0.jar and restart  all brokers one by one
>       2.unset inter.broker.protocol.version=0.10.0.1 and restart all brokers 
> one by one
>       3.unset log.message.format.version=0.10.0.1 and restart all brokers one 
> by one
>  
> for now we have already done step 1 & 2 in 12 clusters.but when we try to 
> upgrade left clusters (already done step 1) in step 2, we found some topics 
> drop produce speed badly.
>      we have research this issue for long time, since we couldn't test it in 
> production environment  and we couldn't reproduce in test environment, we 
> couldn't find the root cause.
> now we only could describe the situation in detail as  i know, hope anyone 
> could help us.
>  
> 1.because bug KAFKA-8653, i add code below in KafkaApis.scala 
> handleJoinGroupRequest function:
> {code:java}
> if (rebalanceTimeoutMs <= 0) {
>  rebalanceTimeoutMs = joinGroupRequest.data.sessionTimeoutMs
> }{code}
> 2.one cluster upgrade failed has 6 8C16G brokers, about 200 topics with 2 
> replicas,every broker keep 3000+ partitions and 1500+ leader partition, but 
> most of them has very low produce message speed,about less than 
> 50messages/sec, only one topic with 300 partitions has more than 2500 
> message/sec with more than 20 consumer groups consume message from it.
> so this whole cluster  produce 4K messages/sec , 11m Bytes in /sec,240m Bytes 
> out /sec.and more than 90% traffic made by that topic has 2500messages/sec.
> when we unset 5 or 6 servers' inter.broker.protocol.version=0.10.0.1  and 
> restart, this topic produce message drop to about 200messages/sec,  i don't 
> know whether the way we use could tirgger any problem.
> 3.we use kafka wrapped by spring-kafka and set kafkatemplate's 
> autoFlush=true, so each producer.send execution will execute producer.flush 
> immediately too.i know flush method will decrease produce performance 
> dramaticlly, but  at least it seems nothing wrong before upgrade step 2. but 
> i doubt whether it's a problem now after upgrade.
> 4.I noticed when produce speed decrease, some consumer group has large 
> message lag still consume message without any consume speed change or 
> decrease, so I guess only producerequest speed will drop down,but 
> fetchrequest not. 
> 5.we haven't set any throttle configuration, and all producers' acks=1(so 
> it's not broker replica fetch slow), and when this problem triggered, both 
> sever & producers cpu usage down, and servers' ioutil keep less than 30% ,so 
> it shuldn't be a hardware problem.
> 6.this event triggered often(almost 100%) most brokers has done upgrade step 
> 2,then after a auto leader replica election executed, then we can observe  
> produce speed drop down,and we have to downgrade brokers(set 
> inter.broker.protocol.version=0.10.0.1)and restart brokers one by one,then it 
> could be normal. some cluster have to downgrade all brokers,but some cluster 
> could left 1 or 2 brokers without downgrade, i notice that the broker not 
> need downgrade is the controller.
> 7.I have print jstack for producer & servers. although I do this not the same 
> cluster, but we can notice that their thread seems really in idle stat.
> 8.both 0.10.0.1 & 2.3.0 kafka-client will trigger this problem too.
> 8.unless the largest one topic will drop produce speed certainly, other topic 
> will drop produce speed randomly. maybe topicA will drop speed in first 
> upgrade attempt but next not, and topicB not drop speed in first attemp but 
> dropped when do another attempt.
> 9.in fact, the largest cluster, has the same topic & group usage scenario 
> mentioned above, but the largest topic has 1w2 messages/sec,will upgrade fail 
> in step 1(just use 2.3.0.jar)
> any help would be grateful, thx, i'm very sad now...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure

2019-12-05 Thread Rohan Kulkarni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989396#comment-16989396
 ] 

Rohan Kulkarni commented on KAFKA-9270:
---

[~vvcephei], [~bchen225242] - Thanks for your response guys.

[~vvcephei] - I had looked at 'default.api.timeout.ms' parameter. However, its 
really tricky to guess how much we should increase this value so that the 
TimeoutException would never occur in production. Would/Do you recommend 
setting this to Integer.MAX_VALUE just like default value for parameter 
'retries'.?

The idea is Kafka streams should not crash due to some glitch at the Broker, 
rather it should recover just like the normal Kafka Consumers.

Regards,
Rohan

> KafkaStream crash on offset commit failure
> --
>
> Key: KAFKA-9270
> URL: https://issues.apache.org/jira/browse/KAFKA-9270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Rohan Kulkarni
>Priority: Critical
>
> On our Production server we intermittently observe Kafka Streams get crashed 
> with TimeoutException while committing offset. The only workaround seems to 
> be restarting the application which is not a desirable solution for a 
> production environment.
>  
> While have already implemented ProductionExceptionHandler which does not 
> seems to address this.
>  
> Please provide a fix for this or a viable workaround.
>  
> +Application side logs:+
> 2019-11-17 08:28:48.055 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks 
> [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373]
>  - stream-thread 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to 
> commit stream task 0_1 due to the following error:*
>  *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}}
>  
> 2019-11-17 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 
> 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager 
> MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: 
> HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions 
> = [], controller = null) Active tasks: Running: Suspended: Restoring: New: 
> Standby tasks: Running: Suspended: Restoring: New:
>  org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}}
>  
> +Kafka broker logs:+
> [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f* 
> (org.apache.zookeeper.ClientCnxn)
>  [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  
> Regards,
> Rohan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure

2019-12-05 Thread Rohan Kulkarni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989395#comment-16989395
 ] 

Rohan Kulkarni commented on KAFKA-9270:
---

[~vvcephei], [~bchen225242] - Thanks for your response guys.

[~vvcephei] - I had looked at 'default.api.timeout.ms' parameter. However, its 
really tricky to guess how much we should increase this value so that the 
TimeoutException would never occur in production. Would/Do you recommend 
setting this to Integer.MAX_VALUE just like default value for parameter 
'retries'.?

The idea is Kafka streams should not crash due to some glitch at the Broker, 
rather it should recover just like the normal Kafka Consumers.

Regards,
Rohan

> KafkaStream crash on offset commit failure
> --
>
> Key: KAFKA-9270
> URL: https://issues.apache.org/jira/browse/KAFKA-9270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Rohan Kulkarni
>Priority: Critical
>
> On our Production server we intermittently observe Kafka Streams get crashed 
> with TimeoutException while committing offset. The only workaround seems to 
> be restarting the application which is not a desirable solution for a 
> production environment.
>  
> While have already implemented ProductionExceptionHandler which does not 
> seems to address this.
>  
> Please provide a fix for this or a viable workaround.
>  
> +Application side logs:+
> 2019-11-17 08:28:48.055 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks 
> [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373]
>  - stream-thread 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to 
> commit stream task 0_1 due to the following error:*
>  *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}}
>  
> 2019-11-17 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 
> 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager 
> MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: 
> HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions 
> = [], controller = null) Active tasks: Running: Suspended: Restoring: New: 
> Standby tasks: Running: Suspended: Restoring: New:
>  org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}}
>  
> +Kafka broker logs:+
> [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f* 
> (org.apache.zookeeper.ClientCnxn)
>  [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  
> Regards,
> Rohan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure

2019-12-05 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989401#comment-16989401
 ] 

John Roesler commented on KAFKA-9270:
-

Hey Rohan,

I see where you’re coming from, but I definitely wouldn’t set the config super 
high. You don’t want the Streams thread to just pause indefinitely waiting for 
a single commit, holding up progress on the rest of its tasks. Better to pause 
a moderate amount of time and then let the thread die if the call is still 
hung. At least then it would give up its tasks, and some other thread can take 
over. 

Note also that you have to pay attention to the max poll interval config. If 
the commit call is stuck waiting, then the thread isn’t calling poll either, so 
it could drop out of the consumer group. 

Personally, I’d probably set them to something like 10 minutes, which would be 
a pretty long “glitch” for the broker, but also might be tolerable from a 
liveness perspective. Then, I’d add monitoring for thread deaths. Maybe even an 
external sidecar process to bounce Streams if some of the threads die. 

Not ideal, I know, but hopefully enough to help you bide your time until we 
work through the proposal that Boyang has started to actually fix this problem 
for you (see the linked ticket). 


> KafkaStream crash on offset commit failure
> --
>
> Key: KAFKA-9270
> URL: https://issues.apache.org/jira/browse/KAFKA-9270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Rohan Kulkarni
>Priority: Critical
>
> On our Production server we intermittently observe Kafka Streams get crashed 
> with TimeoutException while committing offset. The only workaround seems to 
> be restarting the application which is not a desirable solution for a 
> production environment.
>  
> While have already implemented ProductionExceptionHandler which does not 
> seems to address this.
>  
> Please provide a fix for this or a viable workaround.
>  
> +Application side logs:+
> 2019-11-17 08:28:48.055 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks 
> [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373]
>  - stream-thread 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to 
> commit stream task 0_1 due to the following error:*
>  *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}}
>  
> 2019-11-17 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 
> 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager 
> MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: 
> HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions 
> = [], controller = null) Active tasks: Running: Suspended: Restoring: New: 
> Standby tasks: Running: Suspended: Restoring: New:
>  org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}}
>  
> +Kafka broker logs:+
> [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f* 
> (org.apache.zookeeper.ClientCnxn)
>  [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  
> Regards,
> Rohan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure

2019-12-05 Thread Rohan Kulkarni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989410#comment-16989410
 ] 

Rohan Kulkarni commented on KAFKA-9270:
---

Thanks [~vvcephei]. Appreciate your help on this.

> KafkaStream crash on offset commit failure
> --
>
> Key: KAFKA-9270
> URL: https://issues.apache.org/jira/browse/KAFKA-9270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Rohan Kulkarni
>Priority: Critical
>
> On our Production server we intermittently observe Kafka Streams get crashed 
> with TimeoutException while committing offset. The only workaround seems to 
> be restarting the application which is not a desirable solution for a 
> production environment.
>  
> While have already implemented ProductionExceptionHandler which does not 
> seems to address this.
>  
> Please provide a fix for this or a viable workaround.
>  
> +Application side logs:+
> 2019-11-17 08:28:48.055 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks 
> [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373]
>  - stream-thread 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to 
> commit stream task 0_1 due to the following error:*
>  *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}}
>  
> 2019-11-17 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 
> 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager 
> MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: 
> HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions 
> = [], controller = null) Active tasks: Running: Suspended: Restoring: New: 
> Standby tasks: Running: Suspended: Restoring: New:
>  org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}}
>  
> +Kafka broker logs:+
> [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f* 
> (org.apache.zookeeper.ClientCnxn)
>  [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  
> Regards,
> Rohan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9270) KafkaStream crash on offset commit failure

2019-12-05 Thread Rohan Kulkarni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989410#comment-16989410
 ] 

Rohan Kulkarni edited comment on KAFKA-9270 at 12/6/19 5:41 AM:


Thanks [~vvcephei]. Appreciate your help on this.

Just to clarify, as per my understanding with KAFKA-8803 we are targeting to 
fix the TimeoutException in initTxn and Producer flow. Are we also going to 
target the consumer commitOffset flow with that issue?

Otherwise current ticket will be better for independent tracking...


was (Author: rohan26may):
Thanks [~vvcephei]. Appreciate your help on this.

> KafkaStream crash on offset commit failure
> --
>
> Key: KAFKA-9270
> URL: https://issues.apache.org/jira/browse/KAFKA-9270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Rohan Kulkarni
>Priority: Critical
>
> On our Production server we intermittently observe Kafka Streams get crashed 
> with TimeoutException while committing offset. The only workaround seems to 
> be restarting the application which is not a desirable solution for a 
> production environment.
>  
> While have already implemented ProductionExceptionHandler which does not 
> seems to address this.
>  
> Please provide a fix for this or a viable workaround.
>  
> +Application side logs:+
> 2019-11-17 08:28:48.055 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks 
> [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373]
>  - stream-thread 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to 
> commit stream task 0_1 due to the following error:*
>  *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}}
>  
> 2019-11-17 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 
> 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager 
> MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: 
> HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions 
> = [], controller = null) Active tasks: Running: Suspended: Restoring: New: 
> Standby tasks: Running: Suspended: Restoring: New:
>  org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}}
>  
> +Kafka broker logs:+
> [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f* 
> (org.apache.zookeeper.ClientCnxn)
>  [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  
> Regards,
> Rohan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9275) Print assignment and IP address in the log message when consumer leaves/removed from the group

2019-12-05 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989443#comment-16989443
 ] 

dengziming commented on KAFKA-9275:
---

hi, we can print consumer information in rebalance logs, but I think it is 
useless for the `GroupCoordinator` only contains a memberId of the consumer 
information which is random generated, the IP address of a consumer is not kept 
in the `GroupCoordinator`.

> Print assignment and IP address in the log message when consumer 
> leaves/removed from the group
> --
>
> Key: KAFKA-9275
> URL: https://issues.apache.org/jira/browse/KAFKA-9275
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Priority: Minor
>
> In order to simplify identification of which member is causing rebalance, can 
> we add the IP address and the list of topic-partitions assigned to the member 
> that leaves/removed from the group?
> And especially in the "reason" string for rebalance: 
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493
> This will allow a much faster investigation when a consumer group is stuck in 
> rebalancing state.
> Even better if we can send this reason to all other members that are affected 
> by the rebalance. Currently, it takes forever to know what caused a rebalance 
> when a consumer group continuously goes into rebalancing state due to one 
> member keeps exceeding max.poll.intervals.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-9275) Print assignment and IP address in the log message when consumer leaves/removed from the group

2019-12-05 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming updated KAFKA-9275:
--
Comment: was deleted

(was: hi, we can print consumer information in rebalance logs, but I think it 
is useless for the `GroupCoordinator` only contains a memberId of the consumer 
information which is random generated, the IP address of a consumer is not kept 
in the `GroupCoordinator`.)

> Print assignment and IP address in the log message when consumer 
> leaves/removed from the group
> --
>
> Key: KAFKA-9275
> URL: https://issues.apache.org/jira/browse/KAFKA-9275
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Badai Aqrandista
>Priority: Minor
>
> In order to simplify identification of which member is causing rebalance, can 
> we add the IP address and the list of topic-partitions assigned to the member 
> that leaves/removed from the group?
> And especially in the "reason" string for rebalance: 
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L964
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L972
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1144
> https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L493
> This will allow a much faster investigation when a consumer group is stuck in 
> rebalancing state.
> Even better if we can send this reason to all other members that are affected 
> by the rebalance. Currently, it takes forever to know what caused a rebalance 
> when a consumer group continuously goes into rebalancing state due to one 
> member keeps exceeding max.poll.intervals.ms.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9212) Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest

2019-12-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-9212:
--

Assignee: Jason Gustafson

> Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest
> --
>
> Key: KAFKA-9212
> URL: https://issues.apache.org/jira/browse/KAFKA-9212
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, offset manager
>Affects Versions: 2.3.0
> Environment: Linux
>Reporter: Yannick
>Assignee: Jason Gustafson
>Priority: Critical
>
> When running Kafka connect s3 sink connector ( confluent 5.3.0), after one 
> broker got restarted (leaderEpoch updated at this point), the connect worker 
> crashed with the following error : 
> [2019-11-19 16:20:30,097] ERROR [Worker clientId=connect-1, 
> groupId=connect-ls] Uncaught exception in herder work thread, exiting: 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:253)
>  org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by 
> times in 30003ms
>  
> After investigation, it seems it's because it got fenced when sending 
> ListOffsetRequest in loop and then got timed out , as follows :
> [2019-11-19 16:20:30,020] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Sending ListOffsetRequest (type=ListOffsetRequest, 
> replicaId=-1, partitionTimestamps={connect_ls_config-0={timestamp: -1, 
> maxNumOffsets: 1, currentLeaderEpoch: Optional[1]}}, 
> isolationLevel=READ_UNCOMMITTED) to broker kafka6.fra2.internal:9092 (id: 4 
> rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher:905)
> [2019-11-19 16:20:30,044] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher:985)
>  
> The above happens multiple times until timeout.
>  
> According to the debugs, the consumer always get a leaderEpoch of 1 for this 
> topic when starting up :
>  
>  [2019-11-19 13:27:30,802] DEBUG [Consumer clientId=consumer-3, 
> groupId=connect-ls] Updating last seen epoch from null to 1 for partition 
> connect_ls_config-0 (org.apache.kafka.clients.Metadata:178)
>   
>   
>  But according to our brokers log, the leaderEpoch should be 2, as follows :
>   
>  [2019-11-18 14:19:28,988] INFO [Partition connect_ls_config-0 broker=4] 
> connect_ls_config-0 starts at Leader Epoch 2 from offset 22. Previous Leader 
> Epoch was: 1 (kafka.cluster.Partition)
>   
>   
>  This make impossible to restart the worker as it will always get fenced and 
> then finally timeout.
>   
>  It is also impossible to consume with a 2.3 kafka-console-consumer as 
> follows :
>   
>  kafka-console-consumer --bootstrap-server BOOTSTRAPSERVER:9092 --topic 
> connect_ls_config --from-beginning 
>   
>  the above will just hang forever ( which is not expected cause there is 
> data) and we can see those debug messages :
> [2019-11-19 22:17:59,124] DEBUG [Consumer clientId=consumer-1, 
> groupId=console-consumer-3844] Attempt to fetch offsets for partition 
> connect_ls_config-0 failed due to FENCED_LEADER_EPOCH, retrying. 
> (org.apache.kafka.clients.consumer.internals.Fetcher)
>   
>   
>  Interesting fact, if we do subscribe the same way with kafkacat (1.5.0) we 
> can consume without problem ( must be the way kafkacat is consuming ignoring 
> FENCED_LEADER_EPOCH):
>   
>  kafkacat -b BOOTSTRAPSERVER:9092 -t connect_ls_config -o beginning
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9277) move all group state transition rules into their states

2019-12-05 Thread dengziming (Jira)
dengziming created KAFKA-9277:
-

 Summary: move all group state transition rules into their states
 Key: KAFKA-9277
 URL: https://issues.apache.org/jira/browse/KAFKA-9277
 Project: Kafka
  Issue Type: Improvement
Reporter: dengziming
Assignee: dengziming


Today the `GroupMetadata` maintain a validPreviousStates map of all GroupState:

```

private val validPreviousStates: Map[GroupState, Set[GroupState]] =
 Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),
 CompletingRebalance -> Set(PreparingRebalance),
 Stable -> Set(CompletingRebalance),
 PreparingRebalance -> Set(Stable, CompletingRebalance, Empty),
 Empty -> Set(PreparingRebalance))

```

It would be cleaner to move all state transition rules into their states :

```

private[group] sealed trait GroupState {
 val validPreviousStates: Set[GroupState]
}

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)