[GitHub] [kafka] dajac commented on a change in pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-23 Thread GitBox


dajac commented on a change in pull request #9051:
URL: https://github.com/apache/kafka/pull/9051#discussion_r459252113



##
File path: 
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##
@@ -775,6 +775,24 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 verifyMarkPartitionsForTruncation()
   }
 
+  @Test
+  def testDefaultValueRestoredAfterDeleteDynamicConfig(): Unit = {
+val newProps = new Properties
+newProps.put(KafkaConfig.LogRetentionTimeMillisProp, "10")
+newProps.put(KafkaConfig.LogFlushIntervalMsProp, "1")
+TestUtils.incrementalAlterConfigs(servers, adminClients.head, newProps, 
perBrokerConfig = false).all.get

Review comment:
   That makes sense, thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


dajac commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r459258170



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-int remainingRetries = retries;
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Review comment:
   If we want to guarantee that the `deadlineMs` is respected, I think that 
we must set the timeout of the AdminClient's call accordingly: 
`CreateTopicsOptions.timeoutMs`. With the default, I think that the call could 
be longer than half of `MAX_POLL_INTERVAL_MS_CONFIG`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


dajac commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r459258170



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-int remainingRetries = retries;
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Review comment:
   If we want to guarantee that the `deadlineMs` is respected, I think that 
we must set the timeout of the AdminClient's call accordingly: 
'CreateTopicsOptions.timeoutMs`. With the default, I think that the call could 
be longer than half of `MAX_POLL_INTERVAL_MS_CONFIG`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r459262136



##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+servers = makeServers(1, enableControlledShutdown = false)
+val controller = getController().kafkaController
+val count = new AtomicInteger(2)
+val latch = new CountDownLatch(1)
+val spyThread = spy(controller.eventManager.thread)
+controller.eventManager.setControllerEventThread(spyThread)
+val processedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = latch.await()
+  override def preempt(): Unit = {}
+}
+val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = {}
+  override def preempt(): Unit = count.decrementAndGet()
+}
+
+controller.eventManager.put(processedEvent)
+controller.eventManager.put(preemptedEvent)
+controller.eventManager.put(preemptedEvent)
+
+doAnswer((_: InvocationOnMock) => {
+  latch.countDown()
+}).doCallRealMethod().when(spyThread).awaitShutdown()

Review comment:
   Could we have the test call `latch.countDown()` in a background thread 
with a delay, right before we call controller.shutdown? I guess it's possible 
to have race conditions with that solution





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-23 Thread GitBox


stanislavkozlovski commented on pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#issuecomment-662860332


   > add preempt(): Unit method for all ControllerEvent so that all events (and 
future events) must implement it
   > for events that have callbacks, move the preemption from individual 
methods to preempt()
   > add preemption for ApiPartitionReassignmentand ListPartitionReassignments
   
   Great idea



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2020-07-23 Thread Paul Webb (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163344#comment-17163344
 ] 

Paul Webb commented on KAFKA-8154:
--

Hi.  Please note that we were recently affected by this issue.  I tried several 
kafka clients/ brokers and JDKs. The actual cause in the end was that the java 
security providers was being changed.  Specifically:
{code:java}
 Security.insertProviderAt(providerToAdd, 1);
{code}
By adding at position 1, it appeared that the new provider was returning an 
invalid application buffer size. This may explain why this could be difficult 
to reproduce.  When the new provider is added to the back of the list Kafka 
behaved fine. 

Hope this helps. 

 

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9343) Add ps command for Kafka and zookeeper process on z/OS.

2020-07-23 Thread Shuo Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Zhang reassigned KAFKA-9343:
-

Assignee: Shuo Zhang

> Add ps command for Kafka and zookeeper process on z/OS.
> ---
>
> Key: KAFKA-9343
> URL: https://issues.apache.org/jira/browse/KAFKA-9343
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Affects Versions: 2.4.0
> Environment: z/OS, OS/390
>Reporter: Shuo Zhang
>Assignee: Shuo Zhang
>Priority: Major
>  Labels: OS/390, z/OS
> Fix For: 2.5.0, 2.4.2
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> +Note: since the final change scope changed, I changed the summary and 
> description.+ 
> The existing method to check Kafka process for other platform doesn't 
> applicable for z/OS, on z/OS, the best keyword we can use is the JOBNAME. 
> PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
> '\{print $1}') 
> --> 
> PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v 
> grep | awk '\{print $1}') 
> So does the zookeeper process.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2020-07-23 Thread Paul Webb (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163344#comment-17163344
 ] 

Paul Webb edited comment on KAFKA-8154 at 7/23/20, 8:52 AM:


Hi.  Please note that we were recently affected by this issue.  I tried several 
kafka clients/ brokers and JDKs. The actual cause in the end was that the java 
security providers was being changed.  Specifically:
{code:java}
 Security.insertProviderAt(providerToAdd, 1);
{code}
By adding at position 1, it appeared that the new provider was returning an 
invalid application buffer size. This may explain why this could be difficult 
to reproduce.  

This is some logging to show the issue: 
{noformat}
2020-07-23 08:13:55.963Z INFO  -  Adding provider: OpenSSLProvider: Conscrypt 
info: Android's OpenSSL-backed security provider
2020-07-23 08:13:55.963Z INFO  -   SECURITY PROVIDERS -
2020-07-23 08:13:55.965Z INFO  -  -> Sun: SUN info: SUN (DSA key/parameter 
generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; 
JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, 
Collection CertStores, JavaPolicy Po
licy; JavaLoginConfig Configuration)
2020-07-23 08:13:55.965Z INFO  -  -> SunRsaSign: SunRsaSign info: Sun RSA 
signature provider
2020-07-23 08:13:55.965Z INFO  -  -> SunEC: SunEC info: Sun Elliptic Curve 
provider (EC, ECDSA, ECDH)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunJSSE info: Sun JSSE 
provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2)
2020-07-23 08:13:55.966Z INFO  -  -> SunJCE: SunJCE info: SunJCE Provider 
(implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, 
Diffie-Hellman, HMAC)
2020-07-23 08:13:55.966Z INFO  -  -> SunProvider: SunJGSS info: Sun (Kerberos 
v5, SPNEGO)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunSASL info: Sun SASL 
provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, 
CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM)
2020-07-23 08:13:55.966Z INFO  -  -> XMLDSigRI: XMLDSig info: XMLDSig (DOM 
XMLSignatureFactory; DOM KeyInfoFactory; C14N 1.0, C14N 1.1, Exclusive C14N, 
Base64, Enveloped, XPath, XPath2, XSLT TransformServices)
2020-07-23 08:13:55.966Z INFO  -  -> SunPCSC: SunPCSC info: Sun PC/SC provider
2020-07-23 08:13:55.966Z INFO  -  -> BouncyCastleProvider: BC info: 
BouncyCastle Security Provider v1.61
2020-07-23 08:13:55.966Z INFO  -   SECURITY PROVIDERS -{noformat}
So my hypothesis is that the new provider ( in this case from Conscrypt) is 
inserted at the head of the list.  When the SSLSession is called 
(getApplicationBufferSize) it returns MAX_PLAINTEXT_LENGTH which is  2^14 
(16384) as per [https://tools.ietf.org/html/rfc5246#section-6.2.1] 

When the new provider is added to the back of the list Kafka behaved fine and 
this issued disappeared completely. 

Hope this helps. 

 


was (Author: pwebb.itrs):
Hi.  Please note that we were recently affected by this issue.  I tried several 
kafka clients/ brokers and JDKs. The actual cause in the end was that the java 
security providers was being changed.  Specifically:
{code:java}
 Security.insertProviderAt(providerToAdd, 1);
{code}
By adding at position 1, it appeared that the new provider was returning an 
invalid application buffer size. This may explain why this could be difficult 
to reproduce.  When the new provider is added to the back of the list Kafka 
behaved fine. 

Hope this helps. 

 

> Buffer Overflow exceptions between brokers and with clients
> ---
>
> Key: KAFKA-8154
> URL: https://issues.apache.org/jira/browse/KAFKA-8154
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Rajesh Nataraja
>Priority: Major
> Attachments: server.properties.txt
>
>
> https://github.com/apache/kafka/pull/6495
> https://github.com/apache/kafka/pull/5785



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients

2020-07-23 Thread Paul Webb (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163344#comment-17163344
 ] 

Paul Webb edited comment on KAFKA-8154 at 7/23/20, 8:53 AM:


[~rsivaram]    Hi.  Please note that we were recently affected by this issue.  
I tried several kafka clients/ brokers and JDKs with no change. 

The actual cause in the end was that the java security providers was being 
changed.  Specifically:
{code:java}
 Security.insertProviderAt(providerToAdd, 1);
{code}
By adding at position 1, it appeared that the new provider was returning an 
invalid application buffer size. This may explain why this could be difficult 
to reproduce.  

This is some logging to show the issue: 
{noformat}
2020-07-23 08:13:55.963Z INFO  -  Adding provider: OpenSSLProvider: Conscrypt 
info: Android's OpenSSL-backed security provider
2020-07-23 08:13:55.963Z INFO  -   SECURITY PROVIDERS -
2020-07-23 08:13:55.965Z INFO  -  -> Sun: SUN info: SUN (DSA key/parameter 
generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; 
JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, 
Collection CertStores, JavaPolicy Po
licy; JavaLoginConfig Configuration)
2020-07-23 08:13:55.965Z INFO  -  -> SunRsaSign: SunRsaSign info: Sun RSA 
signature provider
2020-07-23 08:13:55.965Z INFO  -  -> SunEC: SunEC info: Sun Elliptic Curve 
provider (EC, ECDSA, ECDH)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunJSSE info: Sun JSSE 
provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2)
2020-07-23 08:13:55.966Z INFO  -  -> SunJCE: SunJCE info: SunJCE Provider 
(implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, 
Diffie-Hellman, HMAC)
2020-07-23 08:13:55.966Z INFO  -  -> SunProvider: SunJGSS info: Sun (Kerberos 
v5, SPNEGO)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunSASL info: Sun SASL 
provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, 
CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM)
2020-07-23 08:13:55.966Z INFO  -  -> XMLDSigRI: XMLDSig info: XMLDSig (DOM 
XMLSignatureFactory; DOM KeyInfoFactory; C14N 1.0, C14N 1.1, Exclusive C14N, 
Base64, Enveloped, XPath, XPath2, XSLT TransformServices)
2020-07-23 08:13:55.966Z INFO  -  -> SunPCSC: SunPCSC info: Sun PC/SC provider
2020-07-23 08:13:55.966Z INFO  -  -> BouncyCastleProvider: BC info: 
BouncyCastle Security Provider v1.61
2020-07-23 08:13:55.966Z INFO  -   SECURITY PROVIDERS -{noformat}
So my hypothesis is that the new provider ( in this case from Conscrypt) is 
inserted at the head of the list.  When the SSLSession is called 
(getApplicationBufferSize) it returns MAX_PLAINTEXT_LENGTH which is  2^14 
(16384) as per [https://tools.ietf.org/html/rfc5246#section-6.2.1] 

When the new provider is added to the back of the list Kafka behaved fine and 
this issued disappeared completely. 

Hope this helps. 

 


was (Author: pwebb.itrs):
Hi.  Please note that we were recently affected by this issue.  I tried several 
kafka clients/ brokers and JDKs. The actual cause in the end was that the java 
security providers was being changed.  Specifically:
{code:java}
 Security.insertProviderAt(providerToAdd, 1);
{code}
By adding at position 1, it appeared that the new provider was returning an 
invalid application buffer size. This may explain why this could be difficult 
to reproduce.  

This is some logging to show the issue: 
{noformat}
2020-07-23 08:13:55.963Z INFO  -  Adding provider: OpenSSLProvider: Conscrypt 
info: Android's OpenSSL-backed security provider
2020-07-23 08:13:55.963Z INFO  -   SECURITY PROVIDERS -
2020-07-23 08:13:55.965Z INFO  -  -> Sun: SUN info: SUN (DSA key/parameter 
generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; 
JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, 
Collection CertStores, JavaPolicy Po
licy; JavaLoginConfig Configuration)
2020-07-23 08:13:55.965Z INFO  -  -> SunRsaSign: SunRsaSign info: Sun RSA 
signature provider
2020-07-23 08:13:55.965Z INFO  -  -> SunEC: SunEC info: Sun Elliptic Curve 
provider (EC, ECDSA, ECDH)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunJSSE info: Sun JSSE 
provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2)
2020-07-23 08:13:55.966Z INFO  -  -> SunJCE: SunJCE info: SunJCE Provider 
(implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, 
Diffie-Hellman, HMAC)
2020-07-23 08:13:55.966Z INFO  -  -> SunProvider: SunJGSS info: Sun (Kerberos 
v5, SPNEGO)
2020-07-23 08:13:55.966Z INFO  -  -> Provider: SunSASL info: Sun SASL 
provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, 
CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM)
2020-07-23 08:13:55.966Z INFO  -  -> XMLDSigRI: XMLDSig info: XMLDSig (DOM 
XMLSi

[jira] [Updated] (KAFKA-9343) Add ps command for Kafka and zookeeper process on z/OS.

2020-07-23 Thread Shuo Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Zhang updated KAFKA-9343:
--
Fix Version/s: 2.6.0

> Add ps command for Kafka and zookeeper process on z/OS.
> ---
>
> Key: KAFKA-9343
> URL: https://issues.apache.org/jira/browse/KAFKA-9343
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Affects Versions: 2.4.0
> Environment: z/OS, OS/390
>Reporter: Shuo Zhang
>Assignee: Shuo Zhang
>Priority: Major
>  Labels: OS/390, z/OS
> Fix For: 2.5.0, 2.6.0, 2.4.2
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> +Note: since the final change scope changed, I changed the summary and 
> description.+ 
> The existing method to check Kafka process for other platform doesn't 
> applicable for z/OS, on z/OS, the best keyword we can use is the JOBNAME. 
> PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
> '\{print $1}') 
> --> 
> PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v 
> grep | awk '\{print $1}') 
> So does the zookeeper process.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon opened a new pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-07-23 Thread GitBox


showuon opened a new pull request #9062:
URL: https://github.com/apache/kafka/pull/9062


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-07-23 Thread GitBox


showuon commented on pull request #9062:
URL: https://github.com/apache/kafka/pull/9062#issuecomment-662919173


   @feyman2016 @huxihx , since you have experience in this test before, could 
you review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10205) NullPointerException in StreamTask

2020-07-23 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-10205:
---

Assignee: Igor Soarez

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-10205
> URL: https://issues.apache.org/jira/browse/KAFKA-10205
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Brian Forkan
>Assignee: Igor Soarez
>Priority: Minor
>  Labels: beginner, newbie
>
> In our Kafka Streams application we have been experiencing a 
> NullPointerException when deploying a new version of our application. This 
> does not happen during a normal rolling restart.
> The exception is:
> {code:java}
> Error caught during partition assignment, will abort the current process and 
> re-throw at the end of 
> rebalance","stack_trace":"java.lang.NullPointerException: nullError caught 
> during partition assignment, will abort the current process and re-throw at 
> the end of rebalance","stack_trace":"java.lang.NullPointerException: null at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at 
> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {code}
> And the relevant lines of code - 
> [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196]
> I suspect "topology.source(partition.topic());" is returning null.
> Has anyone experienced this issue before? I suspect there is a problem with 
> our topology but I can't replicate this on my machine so I can't tell.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10205) NullPointerException in StreamTask

2020-07-23 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163394#comment-17163394
 ] 

Igor Soarez commented on KAFKA-10205:
-

I'm happy take this one on.

 

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-10205
> URL: https://issues.apache.org/jira/browse/KAFKA-10205
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Brian Forkan
>Priority: Minor
>  Labels: beginner, newbie
>
> In our Kafka Streams application we have been experiencing a 
> NullPointerException when deploying a new version of our application. This 
> does not happen during a normal rolling restart.
> The exception is:
> {code:java}
> Error caught during partition assignment, will abort the current process and 
> re-throw at the end of 
> rebalance","stack_trace":"java.lang.NullPointerException: nullError caught 
> during partition assignment, will abort the current process and re-throw at 
> the end of rebalance","stack_trace":"java.lang.NullPointerException: null at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at 
> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {code}
> And the relevant lines of code - 
> [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196]
> I suspect "topology.source(partition.topic());" is returning null.
> Has anyone experienced this issue before? I suspect there is a problem with 
> our topology but I can't replicate this on my machine so I can't tell.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] leonardge opened a new pull request #9063: Fixed deprecated Gradle build Properties.

2020-07-23 Thread GitBox


leonardge opened a new pull request #9063:
URL: https://github.com/apache/kafka/pull/9063


   Gradle properties: `baseName`, `classifier` and `version` has been 
deprecated. So I have change these to `archiveBaseName`, `archiveClassifier` 
and `archiveVersion`. More infomration 
[here](https://docs.gradle.org/6.5/dsl/org.gradle.api.tasks.bundling.Zip.html#org.gradle.api.tasks.bundling.Zip:zip64).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8098) Flaky Test AdminClientIntegrationTest#testConsumerGroups

2020-07-23 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163397#comment-17163397
 ] 

Luke Chen commented on KAFKA-8098:
--

In the test, we first test removing 1 member from group, and then test removing 
the other 2 members from group, and it failed sometimes at the 2nd member 
number assert. After investigation, I found it's because we enabled auto commit 
for the consumers(default setting), and the removed consumer offset commit will 
get the {{UNKNOWN_MEMBER_ID}} error, which will then make the member rejoin. 
(check ConsumerCoordinator#OffsetCommitResponseHandler) So, that's why after 
the 2nd members removing, the members will sometimes be not empty.

I set the consumer config to disable the auto commit to fix this issue. Thanks.

> Flaky Test AdminClientIntegrationTest#testConsumerGroups
> 
>
> Key: KAFKA-8098
> URL: https://issues.apache.org/jira/browse/KAFKA-8098
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3459/tests]
> {quote}java.lang.AssertionError: expected:<2> but was:<0>
> at org.junit.Assert.fail(Assert.java:89)
> at org.junit.Assert.failNotEquals(Assert.java:835)
> at org.junit.Assert.assertEquals(Assert.java:647)
> at org.junit.Assert.assertEquals(Assert.java:633)
> at 
> kafka.api.AdminClientIntegrationTest.testConsumerGroups(AdminClientIntegrationTest.scala:1194){quote}
> STDOUT
> {quote}2019-03-12 10:52:33,482] ERROR [ReplicaFetcher replicaId=2, 
> leaderId=1, fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:33,485] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:35,880] WARN Unable to read additional data from client 
> sessionid 0x104458575770003, likely client has closed socket 
> (org.apache.zookeeper.server.NIOServerCnxn:376)
> [2019-03-12 10:52:38,596] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition elect-preferred-leaders-topic-1-0 at offset 
> 0 (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:38,797] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition elect-preferred-leaders-topic-2-0 at offset 
> 0 (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:51,998] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:52:52,005] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,750] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition mytopic2-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,754] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition mytopic2-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,755] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition mytopic2-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-03-12 10:53:13,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition mytopic2-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionExc

[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-23 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-662968116


   Rebased on trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8037) KTable restore may load bad data

2020-07-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163485#comment-17163485
 ] 

Bruno Cadonna commented on KAFKA-8037:
--

I find the idea of putting records in its original byte representation into 
state stores on source tables in combination with the deserialization of 
records during restoration interesting. With this we would have in the state 
store the same data as in the source topic minus the bad data.  

Having this two mechanism in place would allow us to switch on the optimization 
by default without any further restrictions on serdes since only the 
deserializer is used and never the serializer.  

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data

2020-07-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163485#comment-17163485
 ] 

Bruno Cadonna edited comment on KAFKA-8037 at 7/23/20, 12:04 PM:
-

I find the idea of putting records in its original byte representation into 
state stores on source tables in combination with the deserialization of 
records during restoration promising. With this we would have in the state 
store the same data as in the source topic minus the bad data.  

Having this two mechanism in place would allow us to switch on the optimization 
by default without any further restrictions on serdes since only the 
deserializer is used and never the serializer.  


was (Author: cadonna):
I find the idea of putting records in its original byte representation into 
state stores on source tables in combination with the deserialization of 
records during restoration interesting. With this we would have in the state 
store the same data as in the source topic minus the bad data.  

Having this two mechanism in place would allow us to switch on the optimization 
by default without any further restrictions on serdes since only the 
deserializer is used and never the serializer.  

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data

2020-07-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163485#comment-17163485
 ] 

Bruno Cadonna edited comment on KAFKA-8037 at 7/23/20, 12:21 PM:
-

I find the idea of putting records in its original byte representation into 
state stores on source tables in combination with the deserialization of 
records during restoration promising. With this we would have in the state 
store the same data as in the source topic minus the bad data.  

Having this two mechanism in place would allow us to switch on the optimization 
by default without any further restrictions on serdes since only one single 
deserializer is used and never the serializer.  


was (Author: cadonna):
I find the idea of putting records in its original byte representation into 
state stores on source tables in combination with the deserialization of 
records during restoration promising. With this we would have in the state 
store the same data as in the source topic minus the bad data.  

Having this two mechanism in place would allow us to switch on the optimization 
by default without any further restrictions on serdes since only the 
deserializer is used and never the serializer.  

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress

2020-07-23 Thread Igor Piddubnyi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163529#comment-17163529
 ] 

Igor Piddubnyi commented on KAFKA-8582:
---

Hi [~mjsax], as discussed in PR please assign the ticket to me.

> Consider adding an ExpiredWindowRecordHandler to Suppress
> -
>
> Key: KAFKA-8582
> URL: https://issues.apache.org/jira/browse/KAFKA-8582
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> I got some feedback on Suppress:
> {quote}Specifying how to handle events outside the grace period does seem 
> like a business concern, and simply discarding them thus seems risky (for 
> example imagine any situation where money is involved).
> This sort of situation is addressed by the late-triggering approach 
> associated with watermarks 
> (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given 
> this I wondered if you were considering adding anything similar?{quote}
> It seems like, if a record has arrived past the grace period for its window, 
> then the state of the windowed aggregation would already have been lost, so 
> if we were to compute an aggregation result, it would be incorrect. Plus, 
> since the window is already expired, we can't store the new (incorrect, but 
> more importantly expired) aggregation result either, so any subsequent 
> super-late records would also face the same blank-slate. I think this would 
> wind up looking like this: if you have three timely records for a window, and 
> then three more that arrive after the grace period, and you were doing a 
> count aggregation, you'd see the counts emitted for the window as [1, 2, 3, 
> 1, 1, 1]. I guess we could add a flag to the post-expiration results to 
> indicate that they're broken, but this seems like the wrong approach. The 
> post-expiration aggregation _results_ are meaningless, but I could see 
> wanting to send the past-expiration _input records_ to a dead-letter queue or 
> something instead of dropping them.
> Along this line of thinking, I wonder if we should add an optional 
> past-expiration record handler interface to the suppression operator. Then, 
> you could define your own logic, whether it's a dead-letter queue, sending it 
> to some alerting pipeline, or even just crashing the application before it 
> can do something wrong. This would be a similar pattern to how we allow 
> custom logic to handle deserialization errors by supplying a 
> org.apache.kafka.streams.errors.DeserializationExceptionHandler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME

2020-07-23 Thread Thiago Santos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163590#comment-17163590
 ] 

Thiago Santos commented on KAFKA-9531:
--

I am experiencing the same problem. When i start a local Kafka cluster with 
docker-compose.

The kafka-connect producer gets stuck in this loop when i stop one of the 
containers in the Kafka cluster.

Any update about this issue?
{code:java}
// code placeholder
kafka-connect                                     | 
java.net.UnknownHostException: kafka3kafka-connect                              
       | java.net.UnknownHostException: kafka3kafka-connect                     
                |  at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect          
                           |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect           
                          |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect           
                          |  at 
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect 
                                    |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)kafka-connect
                                     |  at java.lang.Thread.run(Thread.java:748)
{code}

> java.net.UnknownHostException loop on VM rolling update using CNAME
> ---
>
> Key: KAFKA-9531
> URL: https://issues.apache.org/jira/browse/KAFKA-9531
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, controller, network, producer 
>Affects Versions: 2.4.0
>Reporter: Rui Abreu
>Priority: Major
>
> Hello,
>  
> My cluster setup in based on VMs behind DNS CNAME .
> Example:  node.internal is a CNAME to either nodeA.internal or nodeB.internal
> Since kafka-client 1.2.1,  it has been observed that sometimes Kafka clients 
> get stuck on a loop with the exception:
> Example after nodeB.internal is replaced with nodeA.internal 
>  
> {code:java}
> 2020-02-10T12:11:28.181Z o.a.k.c.NetworkClient [WARN]- [Consumer 
> clientId=consumer-6, groupId=consumer.group] Error connecting to node 
> nodeB.internal:9092 (id: 2 rack: null)
> java.net.UnknownHostException: nodeB.internal:9092
>   at java.net.InetAddress.getAllByName0(InetAddress.java:1281) 
> ~[?:1.8.0_222]
>   at java.net.InetAddress.getAllByName(InetAddress.java:1193) 
> ~[?:1.8.0_222]
>   at java.net.InetAddress.getAllByName(InetAddress.java:1127) 
> ~[?:1.8.0_222]
>   at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) 
> ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:943)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:68) 
> ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1114)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1005)
>  ~[stormjar.jar:?]
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:537) 
> ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>  ~[stormjar

[jira] [Comment Edited] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME

2020-07-23 Thread Thiago Santos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163590#comment-17163590
 ] 

Thiago Santos edited comment on KAFKA-9531 at 7/23/20, 1:44 PM:


I am experiencing the same problem. When i start a local Kafka cluster with 
docker-compose.

The kafka-connect producer gets stuck in this loop when i stop one of the 
containers in the Kafka cluster.

Any update about this issue?
{code:java}
kafka-connect                                     | 
java.net.UnknownHostException: kafka3kafka-connect                              
       | java.net.UnknownHostException: kafka3kafka-connect                     
                |  at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect          
                           |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect           
                          |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect           
                          |  at 
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect 
                                    |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code}


was (Author: tcsantos):
I am experiencing the same problem. When i start a local Kafka cluster with 
docker-compose.

The kafka-connect producer gets stuck in this loop when i stop one of the 
containers in the Kafka cluster.

Any update about this issue?
{code:java}
// code placeholder
kafka-connect                                     | 
java.net.UnknownHostException: kafka3kafka-connect                              
       | java.net.UnknownHostException: kafka3kafka-connect                     
                |  at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect          
                           |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect           
                          |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect           
                          |  at 
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect 
                                    |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)kafka-connect
                                     |  at java.lang.Thread.run(Thread.java:748)
{code}

> java.net.UnknownHostException loop on VM rolling update using CNAME
> ---
>
> Key: KAFKA-9531
> URL: https://issues.apache.org/jira/browse/KAFKA-9531
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, controller, network, producer 
>Affects Versions: 2.4.0
>Reporter: Ru

[jira] [Comment Edited] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME

2020-07-23 Thread Thiago Santos (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163590#comment-17163590
 ] 

Thiago Santos edited comment on KAFKA-9531 at 7/23/20, 1:45 PM:


I am experiencing the same problem. When i start a local Kafka cluster with 
docker-compose.

The kafka-connect producer gets stuck in this loop when i stop one of the 
containers in the Kafka cluster.

Any update about this issue?
{code:java}
java.net.UnknownHostException: kafka3java.net.UnknownHostException: kafka3 at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281) at 
java.net.InetAddress.getAllByName(InetAddress.java:1193) at 
java.net.InetAddress.getAllByName(InetAddress.java:1127) at 
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
 at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
 at 
org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
 at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962) 
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294) at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)
 at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code}


was (Author: tcsantos):
I am experiencing the same problem. When i start a local Kafka cluster with 
docker-compose.

The kafka-connect producer gets stuck in this loop when i stop one of the 
containers in the Kafka cluster.

Any update about this issue?
{code:java}
kafka-connect                                     | 
java.net.UnknownHostException: kafka3kafka-connect                              
       | java.net.UnknownHostException: kafka3kafka-connect                     
                |  at 
java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect          
                           |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect           
                          |  at 
java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect           
                          |  at 
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect 
                                    |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect
                                     |  at 
org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect
                                     |  at 
org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect
                                     |  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code}

> java.net.UnknownHostException loop on VM rolling update using CNAME
> ---
>
> Key: KAFKA-9531
> URL: https://issues.apache.org/jira/browse/KAFKA-9531
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, controller, network, producer 
>Affects Versions: 2.4.0
>Reporter: Rui Abreu
>Priority: Major
>
> Hello,
>  
> My cluster setup in based on VMs behind DNS CNAME .
> Example:  node.internal is a CNAME to either nodeA.internal or nodeB.internal
> Since kafka-client 1.2.1,  it has been observed that sometimes Kafka clients 
> get stuck on a loop with the exception:
> Example after nodeB.internal is replaced with nodeA.internal 
>  
> {code:java}
> 2020-02-10T12:11:28.181Z o.a.k.c.NetworkClient [WARN]- [Consumer 
> clientId=consumer-6, groupId=consumer.group] Error connecting to node 
> nodeB.internal:9092 (id: 2 rack: null)
> java.net.UnknownHostException: nodeB.internal:9092
>   at java.net.InetAddress.getAllByName0(InetAddress.java:1281) 
> ~[?:1.8.0_222]
>   at java.net.InetAddress.getAllByName(InetAddress.java:1193) 
> ~[?:1.8.0_222]
>   at java.net.InetAddress.getAllByName(InetAdd

[GitHub] [kafka] soarez opened a new pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-07-23 Thread GitBox


soarez opened a new pull request #9064:
URL: https://github.com/apache/kafka/pull/9064


   Kafka Streams topologies must be defined in a deterministic order, otherwise 
users can run into confusing NPE.
   This patch adds a more descriptive exception and documentation clarifying 
the deterministic requirement.
   
   https://issues.apache.org/jira/browse/KAFKA-10205
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10205) NullPointerException in StreamTask

2020-07-23 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163394#comment-17163394
 ] 

Igor Soarez edited comment on KAFKA-10205 at 7/23/20, 2:01 PM:
---

I'm happy take this one on.

I've opened [https://github.com/apache/kafka/pull/9064]


was (Author: soarez):
I'm happy take this one on.

 

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-10205
> URL: https://issues.apache.org/jira/browse/KAFKA-10205
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Brian Forkan
>Assignee: Igor Soarez
>Priority: Minor
>  Labels: beginner, newbie
>
> In our Kafka Streams application we have been experiencing a 
> NullPointerException when deploying a new version of our application. This 
> does not happen during a normal rolling restart.
> The exception is:
> {code:java}
> Error caught during partition assignment, will abort the current process and 
> re-throw at the end of 
> rebalance","stack_trace":"java.lang.NullPointerException: nullError caught 
> during partition assignment, will abort the current process and re-throw at 
> the end of rebalance","stack_trace":"java.lang.NullPointerException: null at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at 
> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {code}
> And the relevant lines of code - 
> [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196]
> I suspect "topology.source(partition.topic());" is returning null.
> Has anyone experienced this issue before? I suspect there is a problem with 
> our topology but I can't replicate this on my machine so I can't tell.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] piddubnyi commented on pull request #9017: KAFKA-8582 [WIP] Add ability to handle late messages in streams-aggregation

2020-07-23 Thread GitBox


piddubnyi commented on pull request #9017:
URL: https://github.com/apache/kafka/pull/9017#issuecomment-663037679


   Hi @mjsax, I commented the ticket to get it assigned.
   Regarding the KIP, I just created a fresh account in Confluence, however, it 
looks like there are no rights to trigger the KIP. Please suggest the best way 
to proceed. 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-23 Thread GitBox


mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r459499911



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} 
objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on 
this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is 
made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send 
consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying 
record-set's transfer logic.
+ *
+ * For example,
+ *
+ * 
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * 
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care 
must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is 
flushed to the consumer. This class is
+ * intended to be used with {@link 
org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+private final String dest;
+private final Consumer sendConsumer;
+private final ByteBufferOutputStream byteArrayOutputStream;
+private final DataOutput output;
+private int mark;
+
+public RecordsWriter(String dest, Consumer sendConsumer) {
+this.dest = dest;
+this.sendConsumer = sendConsumer;
+this.byteArrayOutputStream = new ByteBufferOutputStream(32);
+this.output = new DataOutputStream(this.byteArrayOutputStream);
+this.mark = 0;
+}
+
+@Override
+public void writeByte(byte val) {
+try {
+output.writeByte(val);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeShort(short val) {
+try {
+output.writeShort(val);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeInt(int val) {
+try {
+output.writeInt(val);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeLong(long val) {
+try {
+output.writeLong(val);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeDouble(double val) {
+try {
+ByteUtils.writeDouble(val, output);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeByteArray(byte[] arr) {
+try {
+output.write(arr);
+} catch (IOException e) {
+throw new RuntimeException("RecordsWriter encountered an IO 
error", e);
+}
+}
+
+@Override
+public void writeUnsignedVarint(int i) {
+  

[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions

2020-07-23 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-10301:

Summary: Partition#remoteReplicasMap can be empty in certain race 
conditions  (was: RemoteReplicasMap can be empty in certain race conditions)

> Partition#remoteReplicasMap can be empty in certain race conditions
> ---
>
> Key: KAFKA-10301
> URL: https://issues.apache.org/jira/browse/KAFKA-10301
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Blocker
>
> In Partition#updateAssignmentAndIsr, we would previously update the 
> `partition#remoteReplicasMap` by adding the new replicas to the map and then 
> removing the old ones 
> ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]
> During a recent refactoring, we changed it to first clear the map and then 
> add all the replicas to it 
> ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))
> While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
> all callers that access the map structure use a lock. Some examples:
> - Partition#updateFollowerFetchState
> - DelayedDeleteRecords#tryComplete
> - Partition#getReplicaOrException - called in 
> `checkEnoughReplicasReachOffset` without a lock, which itself is called by 
> DelayedProduce. I think this can fail a  `ReplicaManager#appendRecords` call.
> While we want to polish the code to ensure these sort of race conditions 
> become harder (or impossible) to introduce, it sounds safest to revert to the 
> previous behavior given the timelines regarding the 2.6 release. Jira X 
> tracks that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10301) RemoteReplicasMap can be empty in certain race conditions

2020-07-23 Thread Stanislav Kozlovski (Jira)
Stanislav Kozlovski created KAFKA-10301:
---

 Summary: RemoteReplicasMap can be empty in certain race conditions
 Key: KAFKA-10301
 URL: https://issues.apache.org/jira/browse/KAFKA-10301
 Project: Kafka
  Issue Type: Bug
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


In Partition#updateAssignmentAndIsr, we would previously update the 
`partition#remoteReplicasMap` by adding the new replicas to the map and then 
removing the old ones 
([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]

During a recent refactoring, we changed it to first clear the map and then add 
all the replicas to it 
([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))

While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
all callers that access the map structure use a lock. Some examples:
- Partition#updateFollowerFetchState
- DelayedDeleteRecords#tryComplete
- Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` 
without a lock, which itself is called by DelayedProduce. I think this can fail 
a  `ReplicaManager#appendRecords` call.

While we want to polish the code to ensure these sort of race conditions become 
harder (or impossible) to introduce, it sounds safest to revert to the previous 
behavior given the timelines regarding the 2.6 release. Jira X tracks that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-23 Thread GitBox


mimaison commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r459460614



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -62,6 +65,7 @@
 private static final int NUM_PARTITIONS = 10;
 private static final int RECORD_TRANSFER_DURATION_MS = 20_000;
 private static final int CHECKPOINT_DURATION_MS = 20_000;
+private static final int OFFSET_SYNC_DURATION_MS = 30_000;
 
 private Time time = Time.SYSTEM;

Review comment:
   We can remove this field now that it's unused

##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
 backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
 }
 
+private void waitForConsumerGroupOffsetSync(Consumer 
consumer, List topics, String consumerGroupId)
+throws InterruptedException {
+Admin backupClient = backup.kafka().createAdminClient();
+List tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+IntStream.range(0, NUM_PARTITIONS).forEach(

Review comment:
   I'm not sure this is much better than a simple `for` loop. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions

2020-07-23 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-10301:

Description: 
In Partition#updateAssignmentAndIsr, we would previously update the 
`partition#remoteReplicasMap` by adding the new replicas to the map and then 
removing the old ones 
([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]

During a recent refactoring, we changed it to first clear the map and then add 
all the replicas to it 
([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))

While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
all callers that access the map structure use a lock. Some examples:
 - Partition#updateFollowerFetchState
 - DelayedDeleteRecords#tryComplete
 - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` 
without a lock, which itself is called by DelayedProduce. I think this can fail 
a  `ReplicaManager#appendRecords` call.

While we want to polish the code to ensure these sort of race conditions become 
harder (or impossible) to introduce, it sounds safest to revert to the previous 
behavior given the timelines regarding the 2.6 release. Jira 
https://issues.apache.org/jira/browse/KAFKA-10302 tracks that.

  was:
In Partition#updateAssignmentAndIsr, we would previously update the 
`partition#remoteReplicasMap` by adding the new replicas to the map and then 
removing the old ones 
([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]

During a recent refactoring, we changed it to first clear the map and then add 
all the replicas to it 
([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))

While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
all callers that access the map structure use a lock. Some examples:
- Partition#updateFollowerFetchState
- DelayedDeleteRecords#tryComplete
- Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` 
without a lock, which itself is called by DelayedProduce. I think this can fail 
a  `ReplicaManager#appendRecords` call.

While we want to polish the code to ensure these sort of race conditions become 
harder (or impossible) to introduce, it sounds safest to revert to the previous 
behavior given the timelines regarding the 2.6 release. Jira X tracks that.


> Partition#remoteReplicasMap can be empty in certain race conditions
> ---
>
> Key: KAFKA-10301
> URL: https://issues.apache.org/jira/browse/KAFKA-10301
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Blocker
>
> In Partition#updateAssignmentAndIsr, we would previously update the 
> `partition#remoteReplicasMap` by adding the new replicas to the map and then 
> removing the old ones 
> ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]
> During a recent refactoring, we changed it to first clear the map and then 
> add all the replicas to it 
> ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))
> While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
> all callers that access the map structure use a lock. Some examples:
>  - Partition#updateFollowerFetchState
>  - DelayedDeleteRecords#tryComplete
>  - Partition#getReplicaOrException - called in 
> `checkEnoughReplicasReachOffset` without a lock, which itself is called by 
> DelayedProduce. I think this can fail a  `ReplicaManager#appendRecords` call.
> While we want to polish the code to ensure these sort of race conditions 
> become harder (or impossible) to introduce, it sounds safest to revert to the 
> previous behavior given the timelines regarding the 2.6 release. Jira 
> https://issues.apache.org/jira/browse/KAFKA-10302 tracks that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10302) Ensure thread-safe access to Partition#remoteReplicasMap

2020-07-23 Thread Stanislav Kozlovski (Jira)
Stanislav Kozlovski created KAFKA-10302:
---

 Summary: Ensure thread-safe access to Partition#remoteReplicasMap
 Key: KAFKA-10302
 URL: https://issues.apache.org/jira/browse/KAFKA-10302
 Project: Kafka
  Issue Type: Bug
Reporter: Stanislav Kozlovski


A recent Jira (https://issues.apache.org/jira/browse/KAFKA-10301) exposed how 
easy it is to introduce nasty race conditions with the 
Partition#remoteReplicasMap data structure. It is a concurrent map which is 
modified inside a write lock but it is not always accessed through that lock.

Therefore it's possible for callers to access an intermediate state of the map, 
for instance in between updating the replica assignment for a given partition.



It would be good to ensure thread-safe access to the data structure in a way 
which makes it harder to introduce such regressions in the future



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions

2020-07-23 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-10301:

Description: 
In Partition#updateAssignmentAndIsr, we would previously update the 
`partition#remoteReplicasMap` by adding the new replicas to the map and then 
removing the old ones 
([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]

During a recent refactoring, we changed it to first clear the map and then add 
all the replicas to it 
([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))

While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
all callers that access the map structure use a lock. Some examples:
 - Partition#updateFollowerFetchState
 - DelayedDeleteRecords#tryComplete
 - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` 
without a lock, which itself is called by DelayedProduce. I think this can fail 
a  `ReplicaManager#appendRecords` call.

While we want to polish the code to ensure these sort of race conditions become 
harder (or impossible) to introduce, it sounds safest to revert to the previous 
behavior given the timelines regarding the 2.6 release. Jira 
https://issues.apache.org/jira/browse/KAFKA-10302 tracks further modifications 
to the code.

  was:
In Partition#updateAssignmentAndIsr, we would previously update the 
`partition#remoteReplicasMap` by adding the new replicas to the map and then 
removing the old ones 
([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]

During a recent refactoring, we changed it to first clear the map and then add 
all the replicas to it 
([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))

While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
all callers that access the map structure use a lock. Some examples:
 - Partition#updateFollowerFetchState
 - DelayedDeleteRecords#tryComplete
 - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` 
without a lock, which itself is called by DelayedProduce. I think this can fail 
a  `ReplicaManager#appendRecords` call.

While we want to polish the code to ensure these sort of race conditions become 
harder (or impossible) to introduce, it sounds safest to revert to the previous 
behavior given the timelines regarding the 2.6 release. Jira 
https://issues.apache.org/jira/browse/KAFKA-10302 tracks that.


> Partition#remoteReplicasMap can be empty in certain race conditions
> ---
>
> Key: KAFKA-10301
> URL: https://issues.apache.org/jira/browse/KAFKA-10301
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Blocker
>
> In Partition#updateAssignmentAndIsr, we would previously update the 
> `partition#remoteReplicasMap` by adding the new replicas to the map and then 
> removing the old ones 
> ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]
> During a recent refactoring, we changed it to first clear the map and then 
> add all the replicas to it 
> ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))
> While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
> all callers that access the map structure use a lock. Some examples:
>  - Partition#updateFollowerFetchState
>  - DelayedDeleteRecords#tryComplete
>  - Partition#getReplicaOrException - called in 
> `checkEnoughReplicasReachOffset` without a lock, which itself is called by 
> DelayedProduce. I think this can fail a  `ReplicaManager#appendRecords` call.
> While we want to polish the code to ensure these sort of race conditions 
> become harder (or impossible) to introduce, it sounds safest to revert to the 
> previous behavior given the timelines regarding the 2.6 release. Jira 
> https://issues.apache.org/jira/browse/KAFKA-10302 tracks further 
> modifications to the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stanislavkozlovski opened a new pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski opened a new pull request #9065:
URL: https://github.com/apache/kafka/pull/9065


   We would previously update the map by adding the new replicas to the map and 
then removing the old ones. During a recent refactoring, we changed the logic 
to first clear the map and then add all the replicas to it.
   
   While this is done in a write lock, not all callers that access the map 
structure use a lock. It is safer to revert to the previous behavior of showing 
the intermediate state of the map with extra replicas, rather than an 
intermediate state of the map with no replicas.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663065197


   cc @ijuma 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459527097



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
  isr: Set[Int],
  addingReplicas: Seq[Int],
  removingReplicas: Seq[Int]): Unit = {
-remoteReplicasMap.clear()
+val replicaSet = assignment.toSet
+val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
 assignment
   .filter(_ != localBrokerId)
   .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, 
topicPartition)))
-
+removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
   I decided to not get fancy with refactorings - this is literally the old 
code 
(https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657))





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions

2020-07-23 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163686#comment-17163686
 ] 

Stanislav Kozlovski commented on KAFKA-10301:
-

cc [~rhauch] - this would be good to get in 2.6

> Partition#remoteReplicasMap can be empty in certain race conditions
> ---
>
> Key: KAFKA-10301
> URL: https://issues.apache.org/jira/browse/KAFKA-10301
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Blocker
>
> In Partition#updateAssignmentAndIsr, we would previously update the 
> `partition#remoteReplicasMap` by adding the new replicas to the map and then 
> removing the old ones 
> ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)]
> During a recent refactoring, we changed it to first clear the map and then 
> add all the replicas to it 
> ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663]))
> While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not 
> all callers that access the map structure use a lock. Some examples:
>  - Partition#updateFollowerFetchState
>  - DelayedDeleteRecords#tryComplete
>  - Partition#getReplicaOrException - called in 
> `checkEnoughReplicasReachOffset` without a lock, which itself is called by 
> DelayedProduce. I think this can fail a  `ReplicaManager#appendRecords` call.
> While we want to polish the code to ensure these sort of race conditions 
> become harder (or impossible) to introduce, it sounds safest to revert to the 
> previous behavior given the timelines regarding the 2.6 release. Jira 
> https://issues.apache.org/jira/browse/KAFKA-10302 tracks further 
> modifications to the code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] AshishRoyJava commented on pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE

2020-07-23 Thread GitBox


AshishRoyJava commented on pull request #9034:
URL: https://github.com/apache/kafka/pull/9034#issuecomment-663066844


   @abbccdda Unit test added.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663069630


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…

2020-07-23 Thread GitBox


asdaraujo commented on a change in pull request #8730:
URL: https://github.com/apache/kafka/pull/8730#discussion_r459536318



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -132,7 +132,7 @@ public String version() {
 return listConsumerGroupOffsets(group).entrySet().stream()
 .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
 .map(x -> checkpoint(group, x.getKey(), x.getValue()))
-.filter(x -> x.downstreamOffset() > 0)  // ignore offsets we 
cannot translate accurately
+.filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we 
cannot translate accurately

Review comment:
   @ryannedolan @heritamas If you guys think this is safe enough I can 
remove that filter. But it doesn't hurt to leave it there just in case a rogue 
negative offset comes through for whatever reason/bug... Please let me know 
what you think.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10303) kafka producer says connect failed in cluster mode

2020-07-23 Thread Yogesh BG (Jira)
Yogesh BG created KAFKA-10303:
-

 Summary: kafka producer says connect failed in cluster mode
 Key: KAFKA-10303
 URL: https://issues.apache.org/jira/browse/KAFKA-10303
 Project: Kafka
  Issue Type: Bug
Reporter: Yogesh BG


Hi

 

I am using kafka broker version 2.3.0

We have two setups with standalone(one node) and 3 nodes cluster

we pump huge data ~25MBPS, ~80K messages per second

It all works well in one node mode

but in case of cluster, producer start throwing connect failed(librd kafka)

after sometime again able to connect start sending traffic.

What could be the issue? some of the configurations are

 
replica.fetch.max.bytes=10485760
num.network.threads=12

num.replica.fetchers=6

queued.max.requests=5

# The number of threads doing disk I/O

num.io.threads=12

replica.socket.receive.buffer.bytes=1

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stanislavkozlovski commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663088134


   Currently working on introducing a test case for this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

2020-07-23 Thread GitBox


mimaison commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r459540733



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) {
 return new DescribeLogDirsResult(new HashMap<>(futures));
 }
 
+private Map 
logDirDescriptions(DescribeLogDirsResponse response) {

Review comment:
   This can be static. Also should we keep it in `DescribeLogDirsResponse`?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
 }
 }
 
+private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+return prepareDescribeLogDirsResponse(error, logDir,
+prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false));
+}
+
+private List prepareDescribeLogDirsTopics(
+long partitionSize, long offsetLag, String topic, int partition, 
boolean isFuture) {
+return singletonList(new DescribeLogDirsTopic()
+.setName(topic)
+.setPartitions(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsPartition()
+.setPartitionIndex(partition)
+.setPartitionSize(partitionSize)
+.setIsFutureKey(isFuture)
+.setOffsetLag(offsetLag;
+}
+
+private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir,
+   
List topics) {
+return new DescribeLogDirsResponse(
+new DescribeLogDirsResponseData().setResults(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+.setErrorCode(error.code())
+.setLogDir(logDir)
+.setTopics(topics)
+)));
+}
+
+@Test
+public void testDescribeLogDirs() throws ExecutionException, 
InterruptedException {
+Set brokers = Collections.singleton(0);
+String logDir = "/var/data/kafka";
+TopicPartition tp = new TopicPartition("topic", 12);
+long partitionSize = 1234567890;
+long offsetLag = 24;
+
+try (AdminClientUnitTestEnv env = mockClientEnv()) {
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+env.kafkaClient().prepareResponseFrom(
+prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, 
partitionSize, offsetLag),
+env.cluster().nodeById(0));
+
+DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+
+Map>> 
descriptions = result.descriptions();
+assertEquals(brokers, descriptions.keySet());
+assertNotNull(descriptions.get(0));
+assertDescriptionContains(descriptions.get(0).get(), logDir, tp, 
partitionSize, offsetLag);
+
+Map> allDescriptions = 
result.allDescriptions().get();
+assertEquals(brokers, allDescriptions.keySet());
+assertDescriptionContains(allDescriptions.get(0), logDir, tp, 
partitionSize, offsetLag);
+}
+}
+
+private void assertDescriptionContains(Map 
descriptionsMap, String logDir,

Review comment:
   This can be static

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
 }
 }
 
+private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {

Review comment:
   This can be static

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
 }
 }
 
+private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 
error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) {
+return prepareDescribeLogDirsResponse(error, logDir,
+prepareDescribeLogDirsTopics(partitionSize, offsetLag, 
tp.topic(), tp.partition(), false));
+}
+
+private List prepareDescribeLogDirsTopics(

Review comment:
   This can be static

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
 }
 }
 
+private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors 

[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459563205



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
  isr: Set[Int],
  addingReplicas: Seq[Int],
  removingReplicas: Seq[Int]): Unit = {
-remoteReplicasMap.clear()
+val replicaSet = assignment.toSet
+val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
 assignment
   .filter(_ != localBrokerId)
   .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, 
topicPartition)))
-
+removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
   Would `remoteReplicasMap --= removedReplicas` work here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-23 Thread GitBox


mimaison commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-663094946


   While not part of your changes, I noticed the tests assumptions are pretty 
loose. For example, we assume 
https://github.com/apache/kafka/pull/9029/files#diff-a03d58195cfe119d0b1ed2693cd0d691L362
 always consume all the 100 messages. The test also assumes there are no 
duplicates. While this may be fine when running in memory, Connect semantics 
are at least once.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


mjsax commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r459577643



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-int remainingRetries = retries;
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Review comment:
   Good question. Default `max.poll.interval.ms` is 5 minutes (ie, the 
deadline is set to 2.5 minutes by default) while default 
`api.default.timeout.ms` is 1 minutes? Thus we might be ok?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163747#comment-17163747
 ] 

Matthias J. Sax commented on KAFKA-10255:
-

Two more:
 * 
[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1630/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/]
 * 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3480/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/]

> Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
> 
>
> Key: KAFKA-10255
> URL: https://issues.apache.org/jira/browse/KAFKA-10255
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 STARTED
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 FAILED
>  java.lang.AssertionError: consumer record size is not zero expected:<0> but 
> was:<2>
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.failNotEquals(Assert.java:835)
>  at org.junit.Assert.assertEquals(Assert.java:647)
>  at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163749#comment-17163749
 ] 

Matthias J. Sax commented on KAFKA-9013:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7491/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/]

> Flaky Test MirrorConnectorsIntegrationTest#testReplication
> --
>
> Key: KAFKA-9013
> URL: https://issues.apache.org/jira/browse/KAFKA-9013
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {code:java}
> java.lang.AssertionError: Condition not met within timeout 2. Offsets not 
> translated downstream to primary cluster.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354)
>   at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239)
> {code}
> h1. Standard Error
> {code}
> Standard Error
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:02 PM org.gla

[GitHub] [kafka] mjsax commented on pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters

2020-07-23 Thread GitBox


mjsax commented on pull request #9052:
URL: https://github.com/apache/kafka/pull/9052#issuecomment-663107263


   Only flaky tests failed. I updated the corresponding Jira tickets.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-23 Thread GitBox


junrao commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-663109443


   @chia7712: Only 6 test failures in the latest run with your PR. 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-23--001.1595503536--chia7712--fix_8334_avoid_deadlock--3462b0008/report.html
   
   I will do another run on trunk for comparison.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


dajac commented on a change in pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#discussion_r459582710



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final 
StreamsConfig streams
 // have existed with the expected number of partitions, or some create 
topic returns fatal errors.
 log.debug("Starting to validate internal topics {} in partition 
assignor.", topics);
 
-int remainingRetries = retries;
+long currentWallClockMs = time.milliseconds();
+final long deadlineMs = currentWallClockMs + retryTimeoutMs;

Review comment:
   That's right. I misread the default value of `max.poll.interval.ms`, too 
many zeros for my eyes ;). The default works fine then. Do we want to protect 
ourselves if the user changes the default? Or shall we just call out that 
`api.default.timeout.ms` should be lower than `max.poll.interval.ms` somewhere?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-23 Thread GitBox


chia7712 edited a comment on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-662810186


   > Could you rebase again? I will run the system tests after that. Thanks.
   
   done. the known flaky ```group_mode_transactions_test.py``` is traced by 
#9059



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-07-23 Thread GitBox


mjsax commented on a change in pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#discussion_r459581717



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -49,6 +49,9 @@
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
  *
+ * It is a requirement that the processing logic (Topology) be defined in a 
deterministic way.

Review comment:
   Nit: insert `` tag to actually get the new paragraph rendered.
   
   Nit: `Topology -> `{@link Topology}`
   
   It's not really clear what "deterministic" means. We should elaborate more.

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -49,6 +49,9 @@
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
  *
+ * It is a requirement that the processing logic (Topology) be defined in a 
deterministic way.
+ * If different instances build different runtime code logic the resulting 
behavior may be unexpected.

Review comment:
   "different" for sure, but this implies that one might have an operator 
the other does not. The observed issue is, that even if both contain the same 
operator, they might be added in different order (and thus be named 
differently) to the `Topology`, thus we should stretch that order matters.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1984,6 +1985,24 @@ public void 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() {
 task.initializeStateStores();
 }
 
+@Test(expected = TopologyException.class)

Review comment:
   We should not use this annotation but rather use `assertThrows` (we 
still have some code that does not use `assertThrows` but we try to lazily 
migrate our tests, as it provides a better test pattern).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -181,8 +182,16 @@ public StreamTask(final TaskId id,
 
 final TimestampExtractor defaultTimestampExtractor = 
config.defaultTimestampExtractor();
 final DeserializationExceptionHandler 
defaultDeserializationExceptionHandler = 
config.defaultDeserializationExceptionHandler();
+final Set sourceTopics = topology.sourceTopics();
 for (final TopicPartition partition : partitions) {
-final SourceNode source = topology.source(partition.topic());
+final String topicName = partition.topic();
+if (!sourceTopics.contains(topicName)) {
+throw new TopologyException(
+"Topic not found " + topicName + ". Is the Streams 
Topology built in a deterministic way?"

Review comment:
   `Topic not found` sounds like as-if the topic was not found in the 
cluster -- however, what actually happened is that we received a record but the 
record's topic is unknown in the sub-topology.
   
   Similar to above, "deterministic" is not really easy to understand. I would 
also not phrase it as a question, but as a statement:
   ```
   ... This may happen if different KafkaStreams instances of the same 
application execute different Topologies. Note that Topologies are only 
identical if all operators are added in the same order.
   ```
   
   Or similar.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-23 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163759#comment-17163759
 ] 

Guozhang Wang commented on KAFKA-10134:
---

[~zhowei] Did your run include both the log4j improvement and the other PR 
depending on fetchable partitions to do long polling?

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-07-23 Thread GitBox


mjsax commented on pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#issuecomment-663113463


   Btw: The PR should be against `trunk`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459588501



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
  isr: Set[Int],
  addingReplicas: Seq[Int],
  removingReplicas: Seq[Int]): Unit = {
-remoteReplicasMap.clear()
+val replicaSet = assignment.toSet
+val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
 assignment
   .filter(_ != localBrokerId)
   .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, 
topicPartition)))
-
+removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
   Oh, this is a `Pool`, so we would have to add a `removeAll` method. 
Seems easy enough though since it can call the relevant method in 
`ConcurrentMap`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-23 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163761#comment-17163761
 ] 

Guozhang Wang commented on KAFKA-10134:
---

BTW I found that the main latency during rebalance is on discovering the 
coordinator while we keep getting "Join group failed with 
org.apache.kafka.common.errors.DisconnectException" and it kept retrying for 
about a minute. But I think you did not shutdown the broker in your experiment, 
is there anything else happening that cause that broker node to be not 
reachable?

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459596274



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
  isr: Set[Int],
  addingReplicas: Seq[Int],
  removingReplicas: Seq[Int]): Unit = {
-remoteReplicasMap.clear()
+val replicaSet = assignment.toSet
+val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
 assignment
   .filter(_ != localBrokerId)
   .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, 
topicPartition)))
-
+removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
   `remoteReplicasMap --= removedReplicas` doesn't compile - the 
`remoteReplicasMap` is using a Kafka `Pool` class which itself is using a Java 
Map and I don't think they support the `--=` notation
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress

2020-07-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-8582:
--

Assignee: Igor Piddubnyi

> Consider adding an ExpiredWindowRecordHandler to Suppress
> -
>
> Key: KAFKA-8582
> URL: https://issues.apache.org/jira/browse/KAFKA-8582
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Igor Piddubnyi
>Priority: Major
>
> I got some feedback on Suppress:
> {quote}Specifying how to handle events outside the grace period does seem 
> like a business concern, and simply discarding them thus seems risky (for 
> example imagine any situation where money is involved).
> This sort of situation is addressed by the late-triggering approach 
> associated with watermarks 
> (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given 
> this I wondered if you were considering adding anything similar?{quote}
> It seems like, if a record has arrived past the grace period for its window, 
> then the state of the windowed aggregation would already have been lost, so 
> if we were to compute an aggregation result, it would be incorrect. Plus, 
> since the window is already expired, we can't store the new (incorrect, but 
> more importantly expired) aggregation result either, so any subsequent 
> super-late records would also face the same blank-slate. I think this would 
> wind up looking like this: if you have three timely records for a window, and 
> then three more that arrive after the grace period, and you were doing a 
> count aggregation, you'd see the counts emitted for the window as [1, 2, 3, 
> 1, 1, 1]. I guess we could add a flag to the post-expiration results to 
> indicate that they're broken, but this seems like the wrong approach. The 
> post-expiration aggregation _results_ are meaningless, but I could see 
> wanting to send the past-expiration _input records_ to a dead-letter queue or 
> something instead of dropping them.
> Along this line of thinking, I wonder if we should add an optional 
> past-expiration record handler interface to the suppression operator. Then, 
> you could define your own logic, whether it's a dead-letter queue, sending it 
> to some alerting pipeline, or even just crashing the application before it 
> can do something wrong. This would be a similar pattern to how we allow 
> custom logic to handle deserialization errors by supplying a 
> org.apache.kafka.streams.errors.DeserializationExceptionHandler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163770#comment-17163770
 ] 

Matthias J. Sax commented on KAFKA-8582:


Added you to the list of contributors and assigned the ticket to you. You can 
now also self-assign tickets.

> Consider adding an ExpiredWindowRecordHandler to Suppress
> -
>
> Key: KAFKA-8582
> URL: https://issues.apache.org/jira/browse/KAFKA-8582
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Igor Piddubnyi
>Priority: Major
>
> I got some feedback on Suppress:
> {quote}Specifying how to handle events outside the grace period does seem 
> like a business concern, and simply discarding them thus seems risky (for 
> example imagine any situation where money is involved).
> This sort of situation is addressed by the late-triggering approach 
> associated with watermarks 
> (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given 
> this I wondered if you were considering adding anything similar?{quote}
> It seems like, if a record has arrived past the grace period for its window, 
> then the state of the windowed aggregation would already have been lost, so 
> if we were to compute an aggregation result, it would be incorrect. Plus, 
> since the window is already expired, we can't store the new (incorrect, but 
> more importantly expired) aggregation result either, so any subsequent 
> super-late records would also face the same blank-slate. I think this would 
> wind up looking like this: if you have three timely records for a window, and 
> then three more that arrive after the grace period, and you were doing a 
> count aggregation, you'd see the counts emitted for the window as [1, 2, 3, 
> 1, 1, 1]. I guess we could add a flag to the post-expiration results to 
> indicate that they're broken, but this seems like the wrong approach. The 
> post-expiration aggregation _results_ are meaningless, but I could see 
> wanting to send the past-expiration _input records_ to a dead-letter queue or 
> something instead of dropping them.
> Along this line of thinking, I wonder if we should add an optional 
> past-expiration record handler interface to the suppression operator. Then, 
> you could define your own logic, whether it's a dead-letter queue, sending it 
> to some alerting pipeline, or even just crashing the application before it 
> can do something wrong. This would be a similar pattern to how we allow 
> custom logic to handle deserialization errors by supplying a 
> org.apache.kafka.streams.errors.DeserializationExceptionHandler.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9017: KAFKA-8582 [WIP] Add ability to handle late messages in streams-aggregation

2020-07-23 Thread GitBox


mjsax commented on pull request #9017:
URL: https://github.com/apache/kafka/pull/9017#issuecomment-663122659


   I found your wiki account and granted write access. You should be all set.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459598439



##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {

Review comment:
   This fails incredibly quickly 100/100 times without the Partition.scala 
changes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459599939



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition,
  isr: Set[Int],
  addingReplicas: Seq[Int],
  removingReplicas: Seq[Int]): Unit = {
-remoteReplicasMap.clear()
+val replicaSet = assignment.toSet
+val removedReplicas = remoteReplicasMap.keys -- replicaSet
+
 assignment
   .filter(_ != localBrokerId)
   .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, 
topicPartition)))
-
+removedReplicas.foreach(remoteReplicasMap.remove)

Review comment:
   Sounds good to introduce the method!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-23 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163796#comment-17163796
 ] 

Sophie Blee-Goldman commented on KAFKA-10284:
-

You know, I think we actually hit this too, but weren't able to recognize the 
problem at the time. A few weeks ago one of our StreamThreads/Consumers seemed 
to "take off" from the group at some point, as evidenced by the steadily 
increasing last-rebalance-seconds-ago metric (whereas the other members had 
rebalanced multiple times since then). Right before this occurred we saw that 
same error message in the logs:

 
{code:java}
ERROR given member.id X is identified as a known static member 1,but not 
matching the expected member.id Y (kafka.coordinator.group.GroupMetadata)
{code}
Unfortunately we killed the client trying to remotely debug it so we couldn't 
get any more useful information. Would you say that this was mysterious 
encounter was likely due to the bug reported here? [~guozhang] [~bchen225242]

 

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.6.1
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#issuecomment-663130390


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459607476



##
File path: core/src/test/scala/unit/kafka/utils/PoolTest.scala
##
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.utils

Review comment:
   Remove `unit`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10000) Atomic commit of source connector records and offsets

2020-07-23 Thread Ning Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163802#comment-17163802
 ] 

Ning Zhang commented on KAFKA-1:


Hi Chris, the purpose of this ticket is very interesting. I wonder what is the 
priority in the overall Kafka Connect backlog, or how is the progress so far 
(needs-KIP)? Thanks

> Atomic commit of source connector records and offsets
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> It'd be nice to be able to configure source connectors such that their 
> offsets are committed if and only if all records up to that point have been 
> ack'd by the producer. This would go a long way towards EOS for source 
> connectors.
>  
> This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
> marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
> for source connectors and is not concerned with source connector offsets.
> This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, 
> which had a lot of discussion around allowing connector-defined transaction 
> boundaries. The suggestion in this ticket is to only use source connector 
> offset commits as the transaction boundaries for connectors; allowing 
> connector-specified transaction boundaries can be addressed separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


mjsax commented on pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#issuecomment-663134416


   Only the `StreamsStandbyTask.test_standby_tasks_rebalance` system test 
failed and it's know to be buggy.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163803#comment-17163803
 ] 

Matthias J. Sax commented on KAFKA-7540:


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3496/testReport/junit/kafka.api/ConsumerBounceTest/testClose/]
{quote}java.lang.AssertionError: Assignment did not complete on time at 
org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.assertTrue(Assert.java:42) at 
kafka.api.ConsumerBounceTest.checkClosedState(ConsumerBounceTest.scala:486) at 
kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:257)
 at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:220){quote}

> Flaky Test ConsumerBounceTest#testClose
> ---
>
> Key: KAFKA-7540
> URL: https://issues.apache.org/jira/browse/KAFKA-7540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0
>Reporter: John Roesler
>Assignee: Jason Gustafson
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.7.0, 2.6.1
>
>
> Observed on Java 8: 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/]
>  
> Stacktrace:
> {noformat}
> java.lang.ArrayIndexOutOfBoundsException: -1
>   at 
> kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146)
>   at 
> kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238)
>   at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.

[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163804#comment-17163804
 ] 

Matthias J. Sax commented on KAFKA-10255:
-

And one more: [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1645/]

> Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
> 
>
> Key: KAFKA-10255
> URL: https://issues.apache.org/jira/browse/KAFKA-10255
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 STARTED
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 FAILED
>  java.lang.AssertionError: consumer record size is not zero expected:<0> but 
> was:<2>
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.failNotEquals(Assert.java:835)
>  at org.junit.Assert.assertEquals(Assert.java:647)
>  at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager

2020-07-23 Thread GitBox


mjsax commented on pull request #9060:
URL: https://github.com/apache/kafka/pull/9060#issuecomment-663135369


   Jenkins failed on know flaky tests only.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-23 Thread GitBox


mjsax commented on pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#issuecomment-663136110


   Flaky tests only.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test

2020-07-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163807#comment-17163807
 ] 

Matthias J. Sax commented on KAFKA-10255:
-

[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/]

> Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
> 
>
> Key: KAFKA-10255
> URL: https://issues.apache.org/jira/browse/KAFKA-10255
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 STARTED
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > 
> testOneWayReplicationWithAutorOffsetSync1 FAILED
>  java.lang.AssertionError: consumer record size is not zero expected:<0> but 
> was:<2>
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.failNotEquals(Assert.java:835)
>  at org.junit.Assert.assertEquals(Assert.java:647)
>  at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soondenana commented on a change in pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log

2020-07-23 Thread GitBox


soondenana commented on a change in pull request #9054:
URL: https://github.com/apache/kafka/pull/9054#discussion_r459603473



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -1151,6 +1144,12 @@ class LogManager(logDirs: Seq[File],
   }
 }
   }
+
+  private def removeLogAndMetrics(logs: Pool[TopicPartition, Log], tp: 
TopicPartition): Log = {
+val removedLog = logs.remove(tp)
+if (removedLog != null) removedLog.removeLogMetrics()
+removedLog

Review comment:
   nit: Lets return Option(removedLog) to ease null checking by clients.
   
   Seems like the same object gets returned by `asyncDelete` but is only used 
in one place in test code, so many want to change the return value of that too. 
The less "null" we have the better.

##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -199,27 +199,22 @@ class LogManager(logDirs: Seq[File],
   if (cleaner != null)
 cleaner.handleLogDirFailure(dir)
 
-  val offlineCurrentTopicPartitions = currentLogs.collect {
-case (tp, log) if log.parentDir == dir => tp
-  }
-  offlineCurrentTopicPartitions.foreach { topicPartition => {
-val removedLog = currentLogs.remove(topicPartition)
-if (removedLog != null) {
-  removedLog.closeHandlers()
-  removedLog.removeLogMetrics()
+  def removeOfflineLogs(logs: Pool[TopicPartition, Log]): 
Iterable[TopicPartition] = {

Review comment:
   Thanks for deduping this code.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


ijuma commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459616154



##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+val active = new AtomicBoolean(true)
+val replicaToCheck = 3
+val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+def partitionState(replicas: java.util.List[Integer]): 
LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+  .setControllerEpoch(1)
+  .setLeader(replicas.get(0))
+  .setLeaderEpoch(1)
+  .setIsr(replicas)
+  .setZkVersion(1)
+  .setReplicas(replicas)
+  .setIsNew(true)
+val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+// Update replica set synchronously first to avoid race conditions
+partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+assertTrue(s"Expected replica $replicaToCheck to be defined", 
partition.getReplica(replicaToCheck).isDefined)
+
+var i = 0

Review comment:
   Shouldn't this be inside the thread state?

##
File path: core/src/main/scala/kafka/utils/Pool.scala
##
@@ -69,6 +69,8 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends 
Iterable[(K, V)] {
 
   def remove(key: K, value: V): Boolean = pool.remove(key, value)
 
+  def removeAll(keys: Iterable[K]): Unit = 
pool.keySet().removeAll(keys.asJavaCollection)

Review comment:
   Nit: `()` is not needed.

##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+val active = new AtomicBoolean(true)
+val replicaToCheck = 3
+val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+def partitionState(replicas: java.util.List[Integer]): 
LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()

Review comment:
   No need to repeat `LeaderAndIsrPartitionState` twice.

##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+val active = new AtomicBoolean(true)
+val replicaToCheck = 3
+val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+def partitionState(replicas: java.util.List[Integer]): 
LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+  .setControllerEpoch(1)
+  .setLeader(replicas.get(0))
+  .setLeaderEpoch(1)
+  .setIsr(replicas)
+  .setZkVersion(1)
+  .setReplicas(replicas)
+  .setIsNew(true)
+val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+// Update replica set synchronously first to avoid race conditions
+partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+assertTrue(s"Expected replica $replicaToCheck to be defined", 
partition.getReplica(replicaToCheck).isDefined)
+
+var i = 0
+val future = executorService.submit((() => {
+  // Flip assignment between two replica sets
+  while (active.get) {
+val replicas = if (i % 2 == 0) {
+  firstReplicaSet
+} else {
+  secondReplicaSet
+}
+
+partition.makeLeader(partitionState(replicas), offsetCheckpoints)
+
+i += 1
+Thread.sleep(1) // just to avoid tight loop
+  }
+}): Runnable)
+
+val deadline = 5.seconds.fromNow
+while(deadline.hasTimeLeft()) {

Review comment:
   Nit: space missing after `while`.

##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala

[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-23 Thread GitBox


ning2008wisc commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-663147296


   totally agree that it may be better to revisit the tests in MM2 and I 
created a ticket https://issues.apache.org/jira/browse/KAFKA-10304 and assign 
it to myself. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2

2020-07-23 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10304:
--

 Summary: Revisit and improve the tests of MirrorMaker 2
 Key: KAFKA-10304
 URL: https://issues.apache.org/jira/browse/KAFKA-10304
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect
Reporter: Ning Zhang
Assignee: Ning Zhang


due to the quick development of Kafka MM 2, unit and integration tests of 
MirrorMaker 2 were made just for covering each individual feature and some of 
them are simply copy-n-paste from the existing tests with small tweaks. It may 
be a good time to revisit and improve the tests, possibly in the following way:

(1) are 100 messages good enough for integration tests?
(2) what about the failure in the middle of integration tests?
(3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset 
sync..) beyond the mirrored message in integration tests?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459640134



##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+val active = new AtomicBoolean(true)
+val replicaToCheck = 3
+val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+def partitionState(replicas: java.util.List[Integer]): 
LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+  .setControllerEpoch(1)
+  .setLeader(replicas.get(0))
+  .setLeaderEpoch(1)
+  .setIsr(replicas)
+  .setZkVersion(1)
+  .setReplicas(replicas)
+  .setIsNew(true)
+val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+// Update replica set synchronously first to avoid race conditions
+partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+assertTrue(s"Expected replica $replicaToCheck to be defined", 
partition.getReplica(replicaToCheck).isDefined)
+
+var i = 0

Review comment:
   Yeah, nice catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459640854



##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+val active = new AtomicBoolean(true)
+val replicaToCheck = 3
+val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+def partitionState(replicas: java.util.List[Integer]): 
LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+  .setControllerEpoch(1)
+  .setLeader(replicas.get(0))
+  .setLeaderEpoch(1)
+  .setIsr(replicas)
+  .setZkVersion(1)
+  .setReplicas(replicas)
+  .setIsNew(true)
+val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+// Update replica set synchronously first to avoid race conditions
+partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+assertTrue(s"Expected replica $replicaToCheck to be defined", 
partition.getReplica(replicaToCheck).isDefined)
+
+var i = 0
+val future = executorService.submit((() => {
+  // Flip assignment between two replica sets
+  while (active.get) {
+val replicas = if (i % 2 == 0) {
+  firstReplicaSet
+} else {
+  secondReplicaSet
+}
+
+partition.makeLeader(partitionState(replicas), offsetCheckpoints)
+
+i += 1
+Thread.sleep(1) // just to avoid tight loop
+  }
+}): Runnable)
+
+val deadline = 5.seconds.fromNow

Review comment:
   I think so. I opted for 5s as I saw the other tests had up to 15s of 
waits for futures. Let me see if 1s can go





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates

2020-07-23 Thread GitBox


stanislavkozlovski commented on a change in pull request #9065:
URL: https://github.com/apache/kafka/pull/9065#discussion_r459645411



##
File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
##
@@ -116,6 +117,56 @@ class PartitionLockTest extends Logging {
 future.get(15, TimeUnit.SECONDS)
   }
 
+  /**
+   * Concurrently calling updateAssignmentAndIsr should always ensure that 
non-lock access
+   * to the inner remoteReplicaMap (accessed by getReplica) cannot see an 
intermediate state
+   * where replicas present both in the old and new assignment are missing
+   */
+  @Test
+  def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
+val active = new AtomicBoolean(true)
+val replicaToCheck = 3
+val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
+val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
+def partitionState(replicas: java.util.List[Integer]): 
LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
+  .setControllerEpoch(1)
+  .setLeader(replicas.get(0))
+  .setLeaderEpoch(1)
+  .setIsr(replicas)
+  .setZkVersion(1)
+  .setReplicas(replicas)
+  .setIsNew(true)
+val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints])
+// Update replica set synchronously first to avoid race conditions
+partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
+assertTrue(s"Expected replica $replicaToCheck to be defined", 
partition.getReplica(replicaToCheck).isDefined)
+
+var i = 0
+val future = executorService.submit((() => {
+  // Flip assignment between two replica sets
+  while (active.get) {
+val replicas = if (i % 2 == 0) {
+  firstReplicaSet
+} else {
+  secondReplicaSet
+}
+
+partition.makeLeader(partitionState(replicas), offsetCheckpoints)
+
+i += 1
+Thread.sleep(1) // just to avoid tight loop
+  }
+}): Runnable)
+
+val deadline = 5.seconds.fromNow

Review comment:
   Lowered to 1s





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffkbkim commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-23 Thread GitBox


jeffkbkim commented on a change in pull request #9050:
URL: https://github.com/apache/kafka/pull/9050#discussion_r459647273



##
File path: 
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##
@@ -598,6 +603,86 @@ class ControllerIntegrationTest extends 
ZooKeeperTestHarness {
 }, "Broker fail to initialize after restart")
   }
 
+  @Test
+  def testPreemptionOnControllerShutdown(): Unit = {
+servers = makeServers(1, enableControlledShutdown = false)
+val controller = getController().kafkaController
+val count = new AtomicInteger(2)
+val latch = new CountDownLatch(1)
+val spyThread = spy(controller.eventManager.thread)
+controller.eventManager.setControllerEventThread(spyThread)
+val processedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = latch.await()
+  override def preempt(): Unit = {}
+}
+val preemptedEvent = new MockEvent(ControllerState.TopicChange) {
+  override def process(): Unit = {}
+  override def preempt(): Unit = count.decrementAndGet()
+}
+
+controller.eventManager.put(processedEvent)
+controller.eventManager.put(preemptedEvent)
+controller.eventManager.put(preemptedEvent)
+
+doAnswer((_: InvocationOnMock) => {
+  latch.countDown()
+}).doCallRealMethod().when(spyThread).awaitShutdown()

Review comment:
   @stanislavkozlovski thanks for the comment. i've tried this approach 
before and the test passes but sometimes output:
   ```
   [2020-07-23 11:12:43,316] ERROR [RequestSendThread controllerId=0] 
Controller 0 fails to send a request to broker localhost:51542 (id: 0 rack: 
null) (kafka.controller.RequestSendThread:76)
   java.lang.InterruptedException
   ```
   i also see a bit of flakiness with this approach as we cannot exactly time 
when `latch.countDown()` is called. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()

2020-07-23 Thread GitBox


abbccdda commented on a change in pull request #8512:
URL: https://github.com/apache/kafka/pull/8512#discussion_r459648674



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -1388,9 +1388,9 @@ public void commitSync() {
  */
 @Override
 public void commitSync(Duration timeout) {
+maybeThrowInvalidGroupIdException();

Review comment:
   If we throw here, we will not execute the current `finally` block to 
call `release()`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-23 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163871#comment-17163871
 ] 

Boyang Chen commented on KAFKA-10284:
-

[~akshaysh] I didn't see any trace that the group coordinator gets migrated in 
the pasted ticket, so it might be a separate issue.

[~ableegoldman] Well, the symptom matches, but I don't know for sure if this is 
the same cause :)

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.6.1
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.
> The bug find credit goes to [~hachikuji] 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9161) Close gaps in Streams configs documentation

2020-07-23 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9161:
---
Fix Version/s: 2.6.0

> Close gaps in Streams configs documentation
> ---
>
> Key: KAFKA-9161
> URL: https://issues.apache.org/jira/browse/KAFKA-9161
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: beginner, newbie, newbie++
> Fix For: 2.6.0
>
>
> There are a number of Streams configs that aren't documented in the 
> "Configuring a Streams Application" section of the docs. As of 2.3 the 
> missing configs are:
>  # default.windowed.key.serde.inner ^
>  # default.windowed.value.serde.inner ^
>  # max.task.idle.ms
>  # rocksdb.config.setter. ^^
>  # topology.optimization
>  # -upgrade.from- fixed
>  ^ these configs are also missing the corresponding DOC string
>  ^^ this one actually does appear on that page, but instead of being included 
> in the list of Streams configs it is for some reason under  "Consumer and 
> Producer Configuration Parameters" ?
> There are also a few configs whose documented name is slightly incorrect, as 
> it is missing the "default" prefix that appears in the actual code. The 
> "missing-default" configs are:
>  # key.serde
>  # value.serde
>  # timestamp.extractor



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna opened a new pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-23 Thread GitBox


cadonna opened a new pull request #9066:
URL: https://github.com/apache/kafka/pull/9066


   In PR #8962 we introduced a sentinel UNKNOWN_OFFSET to mark unknown offsets 
in checkpoint files. The sentinel was set to -2 which is the same value used 
for the sentinel LATEST_OFFSET that is used in subscriptions to signal that 
state stores have been used by an active task. Unfortunately, we missed to skip 
UNKNOWN_OFFSET when we compute the sum of the changelog offsets. 
   
   If a task had only one state store and it did not restore anything before 
the next rebalance, the stream thread wrote -2 (i.e., UNKNOWN_OFFSET) into the 
subscription as sum of the changelog offsets. During assignment, the leader 
interpreted the -2 as if the stream run the task as active although it might 
have run it as standby. This misinterpretation of the sentinel value resulted 
in unexpected task assigments.  
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-23 Thread GitBox


cadonna commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663197156


   Started 10x runs of the failing system test here: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4074/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-23 Thread GitBox


cadonna commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663197372


   Call for review: @ableegoldman @mjsax @vvcephei @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10287) fix flaky streams/streams_standby_replica_test.py

2020-07-23 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163911#comment-17163911
 ] 

Bruno Cadonna commented on KAFKA-10287:
---

[~chia7712] Sorry, I was not aware that you assigned this ticket to yourself. 
Could you please review the PR?

> fix flaky streams/streams_standby_replica_test.py
> -
>
> Key: KAFKA-10287
> URL: https://issues.apache.org/jira/browse/KAFKA-10287
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.streams.streams_standby_replica_test
> Class:  StreamsStandbyTask
> Method: test_standby_tasks_rebalance
> {quote}
> It pass occasionally on my local.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei opened a new pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-23 Thread GitBox


vvcephei opened a new pull request #9067:
URL: https://github.com/apache/kafka/pull/9067


   Fixes this issue:
   ```
   Execution failed for task ':streams:unitTest'.
   > Process 'Gradle Test Executor 69' finished with non-zero exit value 134
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-23 Thread GitBox


vvcephei commented on pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#issuecomment-663223325


   Hey @mjsax , can you review this?
   
   It's weird that the error is reported from `unitTest`, but the only call 
path to the `Exit.exit()` method is though this integration test. It may not 
completely fix the issue, but it doesn't hurt to put this in for now and see 
what happens.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez commented on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-07-23 Thread GitBox


soarez commented on pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#issuecomment-663252849


   Thanks for review @mjsax. I've changed the merge base to `trunk` and believe 
I have addressed all your feedback so far. Please take another look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez edited a comment on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-07-23 Thread GitBox


soarez edited a comment on pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#issuecomment-663252849


   Thanks for review @mjsax. I've changed the base branch to `trunk` and 
believe I have addressed all your feedback so far. Please take another look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-23 Thread GitBox


mjsax commented on pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#issuecomment-663255631


   Is this the right fix? I thought we should replace `System.exit()` with 
`Exit.exit()` to address the isssu? -- We call `System.exit()` in multiple 
system tests. -- And we hav



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-23 Thread GitBox


mjsax commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663256239


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-23 Thread GitBox


mjsax commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-663256489


   @cadonna Should the PR be against `trunk` ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax edited a comment on pull request #9067: MINOR: Streams integration tests should not call exit

2020-07-23 Thread GitBox


mjsax edited a comment on pull request #9067:
URL: https://github.com/apache/kafka/pull/9067#issuecomment-663255631


   Is this the right fix? I thought we should replace `System.exit()` with 
`Exit.exit()` to address the isssu? -- We call `System.exit()` in multiple 
system tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-23 Thread GitBox


vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459762279



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {

Review comment:
   I can't comment above, but can the `poll` call itself throw a timeout 
exception? Or does it always just return no results in that case?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately

Review comment:
   I think this makes sense, but it seems to make some assumptions about 
the internal implementation of the consumer that doesn't seem necessary here. 
Is it important to assert that the consumer can't possibly get a timeout 
exception here?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
 }
 
 final Set changelogTopics = new HashSet<>();
-for (final StateStore stateStore : globalStateStores) {
+
+long deadlineMs = NO_DEADLINE;
+final List storesToInitialize = new 
LinkedList<>(globalStateStores);
+
+while (!storesToInitialize.isEmpty()) {
+// we remove and add back on failure to round-robin through all 
stores
+final StateStore stateStore = storesToInitialize.remove(0);
 globalStoreNames.add(stateStore.name());
 final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
 changelogTopics.add(sourceTopic);
-stateStore.init(globalProcessorContext, stateStore);
+
+try {
+stateStore.init(globalProcessorContext, stateStore);
+deadlineMs = NO_DEADLINE;
+} catch (final RetryableErrorException retryableException) {
+if (taskTimeoutMs == 0L) {
+throw new StreamsException(retryableException.getCause());

Review comment:
   Why not just preserve the whole story of what happened, like `throw new 
StreamsException("Couldn't retry because timeout is set to zero", 
retryableException)`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
restoreRecords.add(recordConverter.convert(record));
 }
 }
-offset = globalConsumer.position(topicPartition);
+try {
+offset = globalConsumer.position(topicPartition);
+} catch (final TimeoutException error) {
+// the `globalConsumer.position()` call should never 
block, because we know that we did
+// a successful `position()` call above for the 
requested partition and thus the consumer
+// should have a valid local position that it can 
return immediately
+
+// hence, a `TimeoutException` indicates a bug and 
thus we rethrow it as fatal `IllegalStateException`
+throw new IllegalStateException(error);
+}
+
 stateRestoreAdapter.restoreBatch(restoreRecords);
 stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
 restoreCount += restoreRecords.size();

Review comment:
   Huh, interesting thought. Just to be clear, it looks like it would 
already block forever in this case today, right?
   Yeah, it does seem like we should implement a similar non-progress 

[jira] [Updated] (KAFKA-10287) fix flaky streams/streams_standby_replica_test.py

2020-07-23 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10287:
-
Priority: Blocker  (was: Major)

> fix flaky streams/streams_standby_replica_test.py
> -
>
> Key: KAFKA-10287
> URL: https://issues.apache.org/jira/browse/KAFKA-10287
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
>
> {quote}
> Module: kafkatest.tests.streams.streams_standby_replica_test
> Class:  StreamsStandbyTask
> Method: test_standby_tasks_rebalance
> {quote}
> It pass occasionally on my local.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10287) fix flaky streams/streams_standby_replica_test.py

2020-07-23 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10287:
-
Fix Version/s: 2.6.0

> fix flaky streams/streams_standby_replica_test.py
> -
>
> Key: KAFKA-10287
> URL: https://issues.apache.org/jira/browse/KAFKA-10287
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 2.6.0
>
>
> {quote}
> Module: kafkatest.tests.streams.streams_standby_replica_test
> Class:  StreamsStandbyTask
> Method: test_standby_tasks_rebalance
> {quote}
> It pass occasionally on my local.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >