[GitHub] [kafka] dajac commented on a change in pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
dajac commented on a change in pull request #9051: URL: https://github.com/apache/kafka/pull/9051#discussion_r459252113 ## File path: core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala ## @@ -775,6 +775,24 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet verifyMarkPartitionsForTruncation() } + @Test + def testDefaultValueRestoredAfterDeleteDynamicConfig(): Unit = { +val newProps = new Properties +newProps.put(KafkaConfig.LogRetentionTimeMillisProp, "10") +newProps.put(KafkaConfig.LogFlushIntervalMsProp, "1") +TestUtils.incrementalAlterConfigs(servers, adminClients.head, newProps, perBrokerConfig = false).all.get Review comment: That makes sense, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
dajac commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r459258170 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); -int remainingRetries = retries; +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; Review comment: If we want to guarantee that the `deadlineMs` is respected, I think that we must set the timeout of the AdminClient's call accordingly: `CreateTopicsOptions.timeoutMs`. With the default, I think that the call could be longer than half of `MAX_POLL_INTERVAL_MS_CONFIG`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
dajac commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r459258170 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); -int remainingRetries = retries; +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; Review comment: If we want to guarantee that the `deadlineMs` is respected, I think that we must set the timeout of the AdminClient's call accordingly: 'CreateTopicsOptions.timeoutMs`. With the default, I think that the call could be longer than half of `MAX_POLL_INTERVAL_MS_CONFIG`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
stanislavkozlovski commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r459262136 ## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ## @@ -598,6 +603,86 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { }, "Broker fail to initialize after restart") } + @Test + def testPreemptionOnControllerShutdown(): Unit = { +servers = makeServers(1, enableControlledShutdown = false) +val controller = getController().kafkaController +val count = new AtomicInteger(2) +val latch = new CountDownLatch(1) +val spyThread = spy(controller.eventManager.thread) +controller.eventManager.setControllerEventThread(spyThread) +val processedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = latch.await() + override def preempt(): Unit = {} +} +val preemptedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = {} + override def preempt(): Unit = count.decrementAndGet() +} + +controller.eventManager.put(processedEvent) +controller.eventManager.put(preemptedEvent) +controller.eventManager.put(preemptedEvent) + +doAnswer((_: InvocationOnMock) => { + latch.countDown() +}).doCallRealMethod().when(spyThread).awaitShutdown() Review comment: Could we have the test call `latch.countDown()` in a background thread with a delay, right before we call controller.shutdown? I guess it's possible to have race conditions with that solution This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
stanislavkozlovski commented on pull request #9050: URL: https://github.com/apache/kafka/pull/9050#issuecomment-662860332 > add preempt(): Unit method for all ControllerEvent so that all events (and future events) must implement it > for events that have callbacks, move the preemption from individual methods to preempt() > add preemption for ApiPartitionReassignmentand ListPartitionReassignments Great idea This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients
[ https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163344#comment-17163344 ] Paul Webb commented on KAFKA-8154: -- Hi. Please note that we were recently affected by this issue. I tried several kafka clients/ brokers and JDKs. The actual cause in the end was that the java security providers was being changed. Specifically: {code:java} Security.insertProviderAt(providerToAdd, 1); {code} By adding at position 1, it appeared that the new provider was returning an invalid application buffer size. This may explain why this could be difficult to reproduce. When the new provider is added to the back of the list Kafka behaved fine. Hope this helps. > Buffer Overflow exceptions between brokers and with clients > --- > > Key: KAFKA-8154 > URL: https://issues.apache.org/jira/browse/KAFKA-8154 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Rajesh Nataraja >Priority: Major > Attachments: server.properties.txt > > > https://github.com/apache/kafka/pull/6495 > https://github.com/apache/kafka/pull/5785 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9343) Add ps command for Kafka and zookeeper process on z/OS.
[ https://issues.apache.org/jira/browse/KAFKA-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuo Zhang reassigned KAFKA-9343: - Assignee: Shuo Zhang > Add ps command for Kafka and zookeeper process on z/OS. > --- > > Key: KAFKA-9343 > URL: https://issues.apache.org/jira/browse/KAFKA-9343 > Project: Kafka > Issue Type: Task > Components: tools >Affects Versions: 2.4.0 > Environment: z/OS, OS/390 >Reporter: Shuo Zhang >Assignee: Shuo Zhang >Priority: Major > Labels: OS/390, z/OS > Fix For: 2.5.0, 2.4.2 > > Original Estimate: 168h > Remaining Estimate: 168h > > +Note: since the final change scope changed, I changed the summary and > description.+ > The existing method to check Kafka process for other platform doesn't > applicable for z/OS, on z/OS, the best keyword we can use is the JOBNAME. > PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk > '\{print $1}') > --> > PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v > grep | awk '\{print $1}') > So does the zookeeper process. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients
[ https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163344#comment-17163344 ] Paul Webb edited comment on KAFKA-8154 at 7/23/20, 8:52 AM: Hi. Please note that we were recently affected by this issue. I tried several kafka clients/ brokers and JDKs. The actual cause in the end was that the java security providers was being changed. Specifically: {code:java} Security.insertProviderAt(providerToAdd, 1); {code} By adding at position 1, it appeared that the new provider was returning an invalid application buffer size. This may explain why this could be difficult to reproduce. This is some logging to show the issue: {noformat} 2020-07-23 08:13:55.963Z INFO - Adding provider: OpenSSLProvider: Conscrypt info: Android's OpenSSL-backed security provider 2020-07-23 08:13:55.963Z INFO - SECURITY PROVIDERS - 2020-07-23 08:13:55.965Z INFO - -> Sun: SUN info: SUN (DSA key/parameter generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, Collection CertStores, JavaPolicy Po licy; JavaLoginConfig Configuration) 2020-07-23 08:13:55.965Z INFO - -> SunRsaSign: SunRsaSign info: Sun RSA signature provider 2020-07-23 08:13:55.965Z INFO - -> SunEC: SunEC info: Sun Elliptic Curve provider (EC, ECDSA, ECDH) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunJSSE info: Sun JSSE provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2) 2020-07-23 08:13:55.966Z INFO - -> SunJCE: SunJCE info: SunJCE Provider (implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, Diffie-Hellman, HMAC) 2020-07-23 08:13:55.966Z INFO - -> SunProvider: SunJGSS info: Sun (Kerberos v5, SPNEGO) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunSASL info: Sun SASL provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM) 2020-07-23 08:13:55.966Z INFO - -> XMLDSigRI: XMLDSig info: XMLDSig (DOM XMLSignatureFactory; DOM KeyInfoFactory; C14N 1.0, C14N 1.1, Exclusive C14N, Base64, Enveloped, XPath, XPath2, XSLT TransformServices) 2020-07-23 08:13:55.966Z INFO - -> SunPCSC: SunPCSC info: Sun PC/SC provider 2020-07-23 08:13:55.966Z INFO - -> BouncyCastleProvider: BC info: BouncyCastle Security Provider v1.61 2020-07-23 08:13:55.966Z INFO - SECURITY PROVIDERS -{noformat} So my hypothesis is that the new provider ( in this case from Conscrypt) is inserted at the head of the list. When the SSLSession is called (getApplicationBufferSize) it returns MAX_PLAINTEXT_LENGTH which is 2^14 (16384) as per [https://tools.ietf.org/html/rfc5246#section-6.2.1] When the new provider is added to the back of the list Kafka behaved fine and this issued disappeared completely. Hope this helps. was (Author: pwebb.itrs): Hi. Please note that we were recently affected by this issue. I tried several kafka clients/ brokers and JDKs. The actual cause in the end was that the java security providers was being changed. Specifically: {code:java} Security.insertProviderAt(providerToAdd, 1); {code} By adding at position 1, it appeared that the new provider was returning an invalid application buffer size. This may explain why this could be difficult to reproduce. When the new provider is added to the back of the list Kafka behaved fine. Hope this helps. > Buffer Overflow exceptions between brokers and with clients > --- > > Key: KAFKA-8154 > URL: https://issues.apache.org/jira/browse/KAFKA-8154 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Rajesh Nataraja >Priority: Major > Attachments: server.properties.txt > > > https://github.com/apache/kafka/pull/6495 > https://github.com/apache/kafka/pull/5785 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8154) Buffer Overflow exceptions between brokers and with clients
[ https://issues.apache.org/jira/browse/KAFKA-8154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163344#comment-17163344 ] Paul Webb edited comment on KAFKA-8154 at 7/23/20, 8:53 AM: [~rsivaram] Hi. Please note that we were recently affected by this issue. I tried several kafka clients/ brokers and JDKs with no change. The actual cause in the end was that the java security providers was being changed. Specifically: {code:java} Security.insertProviderAt(providerToAdd, 1); {code} By adding at position 1, it appeared that the new provider was returning an invalid application buffer size. This may explain why this could be difficult to reproduce. This is some logging to show the issue: {noformat} 2020-07-23 08:13:55.963Z INFO - Adding provider: OpenSSLProvider: Conscrypt info: Android's OpenSSL-backed security provider 2020-07-23 08:13:55.963Z INFO - SECURITY PROVIDERS - 2020-07-23 08:13:55.965Z INFO - -> Sun: SUN info: SUN (DSA key/parameter generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, Collection CertStores, JavaPolicy Po licy; JavaLoginConfig Configuration) 2020-07-23 08:13:55.965Z INFO - -> SunRsaSign: SunRsaSign info: Sun RSA signature provider 2020-07-23 08:13:55.965Z INFO - -> SunEC: SunEC info: Sun Elliptic Curve provider (EC, ECDSA, ECDH) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunJSSE info: Sun JSSE provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2) 2020-07-23 08:13:55.966Z INFO - -> SunJCE: SunJCE info: SunJCE Provider (implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, Diffie-Hellman, HMAC) 2020-07-23 08:13:55.966Z INFO - -> SunProvider: SunJGSS info: Sun (Kerberos v5, SPNEGO) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunSASL info: Sun SASL provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM) 2020-07-23 08:13:55.966Z INFO - -> XMLDSigRI: XMLDSig info: XMLDSig (DOM XMLSignatureFactory; DOM KeyInfoFactory; C14N 1.0, C14N 1.1, Exclusive C14N, Base64, Enveloped, XPath, XPath2, XSLT TransformServices) 2020-07-23 08:13:55.966Z INFO - -> SunPCSC: SunPCSC info: Sun PC/SC provider 2020-07-23 08:13:55.966Z INFO - -> BouncyCastleProvider: BC info: BouncyCastle Security Provider v1.61 2020-07-23 08:13:55.966Z INFO - SECURITY PROVIDERS -{noformat} So my hypothesis is that the new provider ( in this case from Conscrypt) is inserted at the head of the list. When the SSLSession is called (getApplicationBufferSize) it returns MAX_PLAINTEXT_LENGTH which is 2^14 (16384) as per [https://tools.ietf.org/html/rfc5246#section-6.2.1] When the new provider is added to the back of the list Kafka behaved fine and this issued disappeared completely. Hope this helps. was (Author: pwebb.itrs): Hi. Please note that we were recently affected by this issue. I tried several kafka clients/ brokers and JDKs. The actual cause in the end was that the java security providers was being changed. Specifically: {code:java} Security.insertProviderAt(providerToAdd, 1); {code} By adding at position 1, it appeared that the new provider was returning an invalid application buffer size. This may explain why this could be difficult to reproduce. This is some logging to show the issue: {noformat} 2020-07-23 08:13:55.963Z INFO - Adding provider: OpenSSLProvider: Conscrypt info: Android's OpenSSL-backed security provider 2020-07-23 08:13:55.963Z INFO - SECURITY PROVIDERS - 2020-07-23 08:13:55.965Z INFO - -> Sun: SUN info: SUN (DSA key/parameter generation; DSA signing; SHA-1, MD5 digests; SecureRandom; X.509 certificates; JKS & DKS keystores; PKIX CertPathValidator; PKIX CertPathBuilder; LDAP, Collection CertStores, JavaPolicy Po licy; JavaLoginConfig Configuration) 2020-07-23 08:13:55.965Z INFO - -> SunRsaSign: SunRsaSign info: Sun RSA signature provider 2020-07-23 08:13:55.965Z INFO - -> SunEC: SunEC info: Sun Elliptic Curve provider (EC, ECDSA, ECDH) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunJSSE info: Sun JSSE provider(PKCS12, SunX509/PKIX key/trust factories, SSLv3/TLSv1/TLSv1.1/TLSv1.2) 2020-07-23 08:13:55.966Z INFO - -> SunJCE: SunJCE info: SunJCE Provider (implements RSA, DES, Triple DES, AES, Blowfish, ARCFOUR, RC2, PBE, Diffie-Hellman, HMAC) 2020-07-23 08:13:55.966Z INFO - -> SunProvider: SunJGSS info: Sun (Kerberos v5, SPNEGO) 2020-07-23 08:13:55.966Z INFO - -> Provider: SunSASL info: Sun SASL provider(implements client mechanisms for: DIGEST-MD5, GSSAPI, EXTERNAL, PLAIN, CRAM-MD5, NTLM; server mechanisms for: DIGEST-MD5, GSSAPI, CRAM-MD5, NTLM) 2020-07-23 08:13:55.966Z INFO - -> XMLDSigRI: XMLDSig info: XMLDSig (DOM XMLSi
[jira] [Updated] (KAFKA-9343) Add ps command for Kafka and zookeeper process on z/OS.
[ https://issues.apache.org/jira/browse/KAFKA-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shuo Zhang updated KAFKA-9343: -- Fix Version/s: 2.6.0 > Add ps command for Kafka and zookeeper process on z/OS. > --- > > Key: KAFKA-9343 > URL: https://issues.apache.org/jira/browse/KAFKA-9343 > Project: Kafka > Issue Type: Task > Components: tools >Affects Versions: 2.4.0 > Environment: z/OS, OS/390 >Reporter: Shuo Zhang >Assignee: Shuo Zhang >Priority: Major > Labels: OS/390, z/OS > Fix For: 2.5.0, 2.6.0, 2.4.2 > > Original Estimate: 168h > Remaining Estimate: 168h > > +Note: since the final change scope changed, I changed the summary and > description.+ > The existing method to check Kafka process for other platform doesn't > applicable for z/OS, on z/OS, the best keyword we can use is the JOBNAME. > PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk > '\{print $1}') > --> > PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v > grep | awk '\{print $1}') > So does the zookeeper process. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon opened a new pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining
showuon opened a new pull request #9062: URL: https://github.com/apache/kafka/pull/9062 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining
showuon commented on pull request #9062: URL: https://github.com/apache/kafka/pull/9062#issuecomment-662919173 @feyman2016 @huxihx , since you have experience in this test before, could you review this small PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10205) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-10205: --- Assignee: Igor Soarez > NullPointerException in StreamTask > -- > > Key: KAFKA-10205 > URL: https://issues.apache.org/jira/browse/KAFKA-10205 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Brian Forkan >Assignee: Igor Soarez >Priority: Minor > Labels: beginner, newbie > > In our Kafka Streams application we have been experiencing a > NullPointerException when deploying a new version of our application. This > does not happen during a normal rolling restart. > The exception is: > {code:java} > Error caught during partition assignment, will abort the current process and > re-throw at the end of > rebalance","stack_trace":"java.lang.NullPointerException: nullError caught > during partition assignment, will abort the current process and re-throw at > the end of rebalance","stack_trace":"java.lang.NullPointerException: null at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) > at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at > brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {code} > And the relevant lines of code - > [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196] > I suspect "topology.source(partition.topic());" is returning null. > Has anyone experienced this issue before? I suspect there is a problem with > our topology but I can't replicate this on my machine so I can't tell. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10205) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163394#comment-17163394 ] Igor Soarez commented on KAFKA-10205: - I'm happy take this one on. > NullPointerException in StreamTask > -- > > Key: KAFKA-10205 > URL: https://issues.apache.org/jira/browse/KAFKA-10205 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Brian Forkan >Priority: Minor > Labels: beginner, newbie > > In our Kafka Streams application we have been experiencing a > NullPointerException when deploying a new version of our application. This > does not happen during a normal rolling restart. > The exception is: > {code:java} > Error caught during partition assignment, will abort the current process and > re-throw at the end of > rebalance","stack_trace":"java.lang.NullPointerException: nullError caught > during partition assignment, will abort the current process and re-throw at > the end of rebalance","stack_trace":"java.lang.NullPointerException: null at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) > at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at > brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {code} > And the relevant lines of code - > [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196] > I suspect "topology.source(partition.topic());" is returning null. > Has anyone experienced this issue before? I suspect there is a problem with > our topology but I can't replicate this on my machine so I can't tell. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] leonardge opened a new pull request #9063: Fixed deprecated Gradle build Properties.
leonardge opened a new pull request #9063: URL: https://github.com/apache/kafka/pull/9063 Gradle properties: `baseName`, `classifier` and `version` has been deprecated. So I have change these to `archiveBaseName`, `archiveClassifier` and `archiveVersion`. More infomration [here](https://docs.gradle.org/6.5/dsl/org.gradle.api.tasks.bundling.Zip.html#org.gradle.api.tasks.bundling.Zip:zip64). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8098) Flaky Test AdminClientIntegrationTest#testConsumerGroups
[ https://issues.apache.org/jira/browse/KAFKA-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163397#comment-17163397 ] Luke Chen commented on KAFKA-8098: -- In the test, we first test removing 1 member from group, and then test removing the other 2 members from group, and it failed sometimes at the 2nd member number assert. After investigation, I found it's because we enabled auto commit for the consumers(default setting), and the removed consumer offset commit will get the {{UNKNOWN_MEMBER_ID}} error, which will then make the member rejoin. (check ConsumerCoordinator#OffsetCommitResponseHandler) So, that's why after the 2nd members removing, the members will sometimes be not empty. I set the consumer config to disable the auto commit to fix this issue. Thanks. > Flaky Test AdminClientIntegrationTest#testConsumerGroups > > > Key: KAFKA-8098 > URL: https://issues.apache.org/jira/browse/KAFKA-8098 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0, 2.2.1 > > > [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3459/tests] > {quote}java.lang.AssertionError: expected:<2> but was:<0> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at org.junit.Assert.assertEquals(Assert.java:633) > at > kafka.api.AdminClientIntegrationTest.testConsumerGroups(AdminClientIntegrationTest.scala:1194){quote} > STDOUT > {quote}2019-03-12 10:52:33,482] ERROR [ReplicaFetcher replicaId=2, > leaderId=1, fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:52:33,485] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:52:35,880] WARN Unable to read additional data from client > sessionid 0x104458575770003, likely client has closed socket > (org.apache.zookeeper.server.NIOServerCnxn:376) > [2019-03-12 10:52:38,596] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition elect-preferred-leaders-topic-1-0 at offset > 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:52:38,797] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition elect-preferred-leaders-topic-2-0 at offset > 0 (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:52:51,998] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:52:52,005] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:53:13,750] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition mytopic2-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:53:13,754] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition mytopic2-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:53:13,755] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition mytopic2-1 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-03-12 10:53:13,760] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition mytopic2-1 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionExc
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-662968116 Rebased on trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8037) KTable restore may load bad data
[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163485#comment-17163485 ] Bruno Cadonna commented on KAFKA-8037: -- I find the idea of putting records in its original byte representation into state stores on source tables in combination with the deserialization of records during restoration interesting. With this we would have in the state store the same data as in the source topic minus the bad data. Having this two mechanism in place would allow us to switch on the optimization by default without any further restrictions on serdes since only the deserializer is used and never the serializer. > KTable restore may load bad data > > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data
[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163485#comment-17163485 ] Bruno Cadonna edited comment on KAFKA-8037 at 7/23/20, 12:04 PM: - I find the idea of putting records in its original byte representation into state stores on source tables in combination with the deserialization of records during restoration promising. With this we would have in the state store the same data as in the source topic minus the bad data. Having this two mechanism in place would allow us to switch on the optimization by default without any further restrictions on serdes since only the deserializer is used and never the serializer. was (Author: cadonna): I find the idea of putting records in its original byte representation into state stores on source tables in combination with the deserialization of records during restoration interesting. With this we would have in the state store the same data as in the source topic minus the bad data. Having this two mechanism in place would allow us to switch on the optimization by default without any further restrictions on serdes since only the deserializer is used and never the serializer. > KTable restore may load bad data > > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data
[ https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163485#comment-17163485 ] Bruno Cadonna edited comment on KAFKA-8037 at 7/23/20, 12:21 PM: - I find the idea of putting records in its original byte representation into state stores on source tables in combination with the deserialization of records during restoration promising. With this we would have in the state store the same data as in the source topic minus the bad data. Having this two mechanism in place would allow us to switch on the optimization by default without any further restrictions on serdes since only one single deserializer is used and never the serializer. was (Author: cadonna): I find the idea of putting records in its original byte representation into state stores on source tables in combination with the deserialization of records during restoration promising. With this we would have in the state store the same data as in the source topic minus the bad data. Having this two mechanism in place would allow us to switch on the optimization by default without any further restrictions on serdes since only the deserializer is used and never the serializer. > KTable restore may load bad data > > > Key: KAFKA-8037 > URL: https://issues.apache.org/jira/browse/KAFKA-8037 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > Labels: pull-request-available > > If an input topic contains bad data, users can specify a > `deserialization.exception.handler` to drop corrupted records on read. > However, this mechanism may be by-passed on restore. Assume a > `builder.table()` call reads and drops a corrupted record. If the table state > is lost and restored from the changelog topic, the corrupted record may be > copied into the store, because on restore plain bytes are copied. > If the KTable is used in a join, an internal `store.get()` call to lookup the > record would fail with a deserialization exception if the value part cannot > be deserialized. > GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for > GlobalKTable case). It's unclear to me atm, how this issue could be addressed > for KTables though. > Note, that user state stores are not affected, because they always have a > dedicated changelog topic (and don't reuse an input topic) and thus the > corrupted record would not be written into the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress
[ https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163529#comment-17163529 ] Igor Piddubnyi commented on KAFKA-8582: --- Hi [~mjsax], as discussed in PR please assign the ticket to me. > Consider adding an ExpiredWindowRecordHandler to Suppress > - > > Key: KAFKA-8582 > URL: https://issues.apache.org/jira/browse/KAFKA-8582 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > I got some feedback on Suppress: > {quote}Specifying how to handle events outside the grace period does seem > like a business concern, and simply discarding them thus seems risky (for > example imagine any situation where money is involved). > This sort of situation is addressed by the late-triggering approach > associated with watermarks > (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given > this I wondered if you were considering adding anything similar?{quote} > It seems like, if a record has arrived past the grace period for its window, > then the state of the windowed aggregation would already have been lost, so > if we were to compute an aggregation result, it would be incorrect. Plus, > since the window is already expired, we can't store the new (incorrect, but > more importantly expired) aggregation result either, so any subsequent > super-late records would also face the same blank-slate. I think this would > wind up looking like this: if you have three timely records for a window, and > then three more that arrive after the grace period, and you were doing a > count aggregation, you'd see the counts emitted for the window as [1, 2, 3, > 1, 1, 1]. I guess we could add a flag to the post-expiration results to > indicate that they're broken, but this seems like the wrong approach. The > post-expiration aggregation _results_ are meaningless, but I could see > wanting to send the past-expiration _input records_ to a dead-letter queue or > something instead of dropping them. > Along this line of thinking, I wonder if we should add an optional > past-expiration record handler interface to the suppression operator. Then, > you could define your own logic, whether it's a dead-letter queue, sending it > to some alerting pipeline, or even just crashing the application before it > can do something wrong. This would be a similar pattern to how we allow > custom logic to handle deserialization errors by supplying a > org.apache.kafka.streams.errors.DeserializationExceptionHandler. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME
[ https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163590#comment-17163590 ] Thiago Santos commented on KAFKA-9531: -- I am experiencing the same problem. When i start a local Kafka cluster with docker-compose. The kafka-connect producer gets stuck in this loop when i stop one of the containers in the Kafka cluster. Any update about this issue? {code:java} // code placeholder kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | at java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)kafka-connect | at java.lang.Thread.run(Thread.java:748) {code} > java.net.UnknownHostException loop on VM rolling update using CNAME > --- > > Key: KAFKA-9531 > URL: https://issues.apache.org/jira/browse/KAFKA-9531 > Project: Kafka > Issue Type: Bug > Components: clients, controller, network, producer >Affects Versions: 2.4.0 >Reporter: Rui Abreu >Priority: Major > > Hello, > > My cluster setup in based on VMs behind DNS CNAME . > Example: node.internal is a CNAME to either nodeA.internal or nodeB.internal > Since kafka-client 1.2.1, it has been observed that sometimes Kafka clients > get stuck on a loop with the exception: > Example after nodeB.internal is replaced with nodeA.internal > > {code:java} > 2020-02-10T12:11:28.181Z o.a.k.c.NetworkClient [WARN]- [Consumer > clientId=consumer-6, groupId=consumer.group] Error connecting to node > nodeB.internal:9092 (id: 2 rack: null) > java.net.UnknownHostException: nodeB.internal:9092 > at java.net.InetAddress.getAllByName0(InetAddress.java:1281) > ~[?:1.8.0_222] > at java.net.InetAddress.getAllByName(InetAddress.java:1193) > ~[?:1.8.0_222] > at java.net.InetAddress.getAllByName(InetAddress.java:1127) > ~[?:1.8.0_222] > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:943) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:68) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1114) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1005) > ~[stormjar.jar:?] > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:537) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > ~[stormjar
[jira] [Comment Edited] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME
[ https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163590#comment-17163590 ] Thiago Santos edited comment on KAFKA-9531 at 7/23/20, 1:44 PM: I am experiencing the same problem. When i start a local Kafka cluster with docker-compose. The kafka-connect producer gets stuck in this loop when i stop one of the containers in the Kafka cluster. Any update about this issue? {code:java} kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | at java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code} was (Author: tcsantos): I am experiencing the same problem. When i start a local Kafka cluster with docker-compose. The kafka-connect producer gets stuck in this loop when i stop one of the containers in the Kafka cluster. Any update about this issue? {code:java} // code placeholder kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | at java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)kafka-connect | at java.lang.Thread.run(Thread.java:748) {code} > java.net.UnknownHostException loop on VM rolling update using CNAME > --- > > Key: KAFKA-9531 > URL: https://issues.apache.org/jira/browse/KAFKA-9531 > Project: Kafka > Issue Type: Bug > Components: clients, controller, network, producer >Affects Versions: 2.4.0 >Reporter: Ru
[jira] [Comment Edited] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME
[ https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163590#comment-17163590 ] Thiago Santos edited comment on KAFKA-9531 at 7/23/20, 1:45 PM: I am experiencing the same problem. When i start a local Kafka cluster with docker-compose. The kafka-connect producer gets stuck in this loop when i stop one of the containers in the Kafka cluster. Any update about this issue? {code:java} java.net.UnknownHostException: kafka3java.net.UnknownHostException: kafka3 at java.net.InetAddress.getAllByName0(InetAddress.java:1281) at java.net.InetAddress.getAllByName(InetAddress.java:1193) at java.net.InetAddress.getAllByName(InetAddress.java:1127) at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110) at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294) at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code} was (Author: tcsantos): I am experiencing the same problem. When i start a local Kafka cluster with docker-compose. The kafka-connect producer gets stuck in this loop when i stop one of the containers in the Kafka cluster. Any update about this issue? {code:java} kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | java.net.UnknownHostException: kafka3kafka-connect | at java.net.InetAddress.getAllByName0(InetAddress.java:1281)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1193)kafka-connect | at java.net.InetAddress.getAllByName(InetAddress.java:1127)kafka-connect | at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)kafka-connect | at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)kafka-connect | at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:962)kafka-connect | at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:350)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)kafka-connect | at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239){code} > java.net.UnknownHostException loop on VM rolling update using CNAME > --- > > Key: KAFKA-9531 > URL: https://issues.apache.org/jira/browse/KAFKA-9531 > Project: Kafka > Issue Type: Bug > Components: clients, controller, network, producer >Affects Versions: 2.4.0 >Reporter: Rui Abreu >Priority: Major > > Hello, > > My cluster setup in based on VMs behind DNS CNAME . > Example: node.internal is a CNAME to either nodeA.internal or nodeB.internal > Since kafka-client 1.2.1, it has been observed that sometimes Kafka clients > get stuck on a loop with the exception: > Example after nodeB.internal is replaced with nodeA.internal > > {code:java} > 2020-02-10T12:11:28.181Z o.a.k.c.NetworkClient [WARN]- [Consumer > clientId=consumer-6, groupId=consumer.group] Error connecting to node > nodeB.internal:9092 (id: 2 rack: null) > java.net.UnknownHostException: nodeB.internal:9092 > at java.net.InetAddress.getAllByName0(InetAddress.java:1281) > ~[?:1.8.0_222] > at java.net.InetAddress.getAllByName(InetAddress.java:1193) > ~[?:1.8.0_222] > at java.net.InetAddress.getAllByName(InetAdd
[GitHub] [kafka] soarez opened a new pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies
soarez opened a new pull request #9064: URL: https://github.com/apache/kafka/pull/9064 Kafka Streams topologies must be defined in a deterministic order, otherwise users can run into confusing NPE. This patch adds a more descriptive exception and documentation clarifying the deterministic requirement. https://issues.apache.org/jira/browse/KAFKA-10205 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10205) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163394#comment-17163394 ] Igor Soarez edited comment on KAFKA-10205 at 7/23/20, 2:01 PM: --- I'm happy take this one on. I've opened [https://github.com/apache/kafka/pull/9064] was (Author: soarez): I'm happy take this one on. > NullPointerException in StreamTask > -- > > Key: KAFKA-10205 > URL: https://issues.apache.org/jira/browse/KAFKA-10205 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Brian Forkan >Assignee: Igor Soarez >Priority: Minor > Labels: beginner, newbie > > In our Kafka Streams application we have been experiencing a > NullPointerException when deploying a new version of our application. This > does not happen during a normal rolling restart. > The exception is: > {code:java} > Error caught during partition assignment, will abort the current process and > re-throw at the end of > rebalance","stack_trace":"java.lang.NullPointerException: nullError caught > during partition assignment, will abort the current process and re-throw at > the end of rebalance","stack_trace":"java.lang.NullPointerException: null at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) > at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at > brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {code} > And the relevant lines of code - > [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196] > I suspect "topology.source(partition.topic());" is returning null. > Has anyone experienced this issue before? I suspect there is a problem with > our topology but I can't replicate this on my machine so I can't tell. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] piddubnyi commented on pull request #9017: KAFKA-8582 [WIP] Add ability to handle late messages in streams-aggregation
piddubnyi commented on pull request #9017: URL: https://github.com/apache/kafka/pull/9017#issuecomment-663037679 Hi @mjsax, I commented the ticket to get it assigned. Regarding the KIP, I just created a fresh account in Confluence, however, it looks like there are no rights to trigger the KIP. Please suggest the best way to proceed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r459499911 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.network.ByteBufferSend; +import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.utils.ByteBufferOutputStream; +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +/** + * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer + * of data from a record-set's file channel to the eventual socket channel. + * + * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array + * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written + * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes, + * another Send is passed to the consumer which wraps the underlying record-set's transfer logic. + * + * For example, + * + * + * recordsWritable.writeInt(10); + * recordsWritable.writeRecords(records1); + * recordsWritable.writeInt(20); + * recordsWritable.writeRecords(records2); + * recordsWritable.writeInt(30); + * recordsWritable.flush(); + * + * + * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any + * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is + * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}. + * + * @see org.apache.kafka.common.requests.FetchResponse + */ +public class RecordsWriter implements Writable { +private final String dest; +private final Consumer sendConsumer; +private final ByteBufferOutputStream byteArrayOutputStream; +private final DataOutput output; +private int mark; + +public RecordsWriter(String dest, Consumer sendConsumer) { +this.dest = dest; +this.sendConsumer = sendConsumer; +this.byteArrayOutputStream = new ByteBufferOutputStream(32); +this.output = new DataOutputStream(this.byteArrayOutputStream); +this.mark = 0; +} + +@Override +public void writeByte(byte val) { +try { +output.writeByte(val); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeShort(short val) { +try { +output.writeShort(val); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeInt(int val) { +try { +output.writeInt(val); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeLong(long val) { +try { +output.writeLong(val); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeDouble(double val) { +try { +ByteUtils.writeDouble(val, output); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeByteArray(byte[] arr) { +try { +output.write(arr); +} catch (IOException e) { +throw new RuntimeException("RecordsWriter encountered an IO error", e); +} +} + +@Override +public void writeUnsignedVarint(int i) { +
[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions
[ https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-10301: Summary: Partition#remoteReplicasMap can be empty in certain race conditions (was: RemoteReplicasMap can be empty in certain race conditions) > Partition#remoteReplicasMap can be empty in certain race conditions > --- > > Key: KAFKA-10301 > URL: https://issues.apache.org/jira/browse/KAFKA-10301 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Blocker > > In Partition#updateAssignmentAndIsr, we would previously update the > `partition#remoteReplicasMap` by adding the new replicas to the map and then > removing the old ones > ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] > During a recent refactoring, we changed it to first clear the map and then > add all the replicas to it > ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) > While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not > all callers that access the map structure use a lock. Some examples: > - Partition#updateFollowerFetchState > - DelayedDeleteRecords#tryComplete > - Partition#getReplicaOrException - called in > `checkEnoughReplicasReachOffset` without a lock, which itself is called by > DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. > While we want to polish the code to ensure these sort of race conditions > become harder (or impossible) to introduce, it sounds safest to revert to the > previous behavior given the timelines regarding the 2.6 release. Jira X > tracks that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10301) RemoteReplicasMap can be empty in certain race conditions
Stanislav Kozlovski created KAFKA-10301: --- Summary: RemoteReplicasMap can be empty in certain race conditions Key: KAFKA-10301 URL: https://issues.apache.org/jira/browse/KAFKA-10301 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira X tracks that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
mimaison commented on a change in pull request #9029: URL: https://github.com/apache/kafka/pull/9029#discussion_r459460614 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -62,6 +65,7 @@ private static final int NUM_PARTITIONS = 10; private static final int RECORD_TRANSFER_DURATION_MS = 20_000; private static final int CHECKPOINT_DURATION_MS = 20_000; +private static final int OFFSET_SYNC_DURATION_MS = 30_000; private Time time = Time.SYSTEM; Review comment: We can remove this field now that it's unused ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException { backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count()); } +private void waitForConsumerGroupOffsetSync(Consumer consumer, List topics, String consumerGroupId) +throws InterruptedException { +Admin backupClient = backup.kafka().createAdminClient(); +List tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); +IntStream.range(0, NUM_PARTITIONS).forEach( Review comment: I'm not sure this is much better than a simple `for` loop. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions
[ https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-10301: Description: In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira https://issues.apache.org/jira/browse/KAFKA-10302 tracks that. was: In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira X tracks that. > Partition#remoteReplicasMap can be empty in certain race conditions > --- > > Key: KAFKA-10301 > URL: https://issues.apache.org/jira/browse/KAFKA-10301 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Blocker > > In Partition#updateAssignmentAndIsr, we would previously update the > `partition#remoteReplicasMap` by adding the new replicas to the map and then > removing the old ones > ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] > During a recent refactoring, we changed it to first clear the map and then > add all the replicas to it > ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) > While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not > all callers that access the map structure use a lock. Some examples: > - Partition#updateFollowerFetchState > - DelayedDeleteRecords#tryComplete > - Partition#getReplicaOrException - called in > `checkEnoughReplicasReachOffset` without a lock, which itself is called by > DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. > While we want to polish the code to ensure these sort of race conditions > become harder (or impossible) to introduce, it sounds safest to revert to the > previous behavior given the timelines regarding the 2.6 release. Jira > https://issues.apache.org/jira/browse/KAFKA-10302 tracks that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10302) Ensure thread-safe access to Partition#remoteReplicasMap
Stanislav Kozlovski created KAFKA-10302: --- Summary: Ensure thread-safe access to Partition#remoteReplicasMap Key: KAFKA-10302 URL: https://issues.apache.org/jira/browse/KAFKA-10302 Project: Kafka Issue Type: Bug Reporter: Stanislav Kozlovski A recent Jira (https://issues.apache.org/jira/browse/KAFKA-10301) exposed how easy it is to introduce nasty race conditions with the Partition#remoteReplicasMap data structure. It is a concurrent map which is modified inside a write lock but it is not always accessed through that lock. Therefore it's possible for callers to access an intermediate state of the map, for instance in between updating the replica assignment for a given partition. It would be good to ensure thread-safe access to the data structure in a way which makes it harder to introduce such regressions in the future -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions
[ https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-10301: Description: In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira https://issues.apache.org/jira/browse/KAFKA-10302 tracks further modifications to the code. was: In Partition#updateAssignmentAndIsr, we would previously update the `partition#remoteReplicasMap` by adding the new replicas to the map and then removing the old ones ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] During a recent refactoring, we changed it to first clear the map and then add all the replicas to it ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not all callers that access the map structure use a lock. Some examples: - Partition#updateFollowerFetchState - DelayedDeleteRecords#tryComplete - Partition#getReplicaOrException - called in `checkEnoughReplicasReachOffset` without a lock, which itself is called by DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. While we want to polish the code to ensure these sort of race conditions become harder (or impossible) to introduce, it sounds safest to revert to the previous behavior given the timelines regarding the 2.6 release. Jira https://issues.apache.org/jira/browse/KAFKA-10302 tracks that. > Partition#remoteReplicasMap can be empty in certain race conditions > --- > > Key: KAFKA-10301 > URL: https://issues.apache.org/jira/browse/KAFKA-10301 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Blocker > > In Partition#updateAssignmentAndIsr, we would previously update the > `partition#remoteReplicasMap` by adding the new replicas to the map and then > removing the old ones > ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] > During a recent refactoring, we changed it to first clear the map and then > add all the replicas to it > ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) > While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not > all callers that access the map structure use a lock. Some examples: > - Partition#updateFollowerFetchState > - DelayedDeleteRecords#tryComplete > - Partition#getReplicaOrException - called in > `checkEnoughReplicasReachOffset` without a lock, which itself is called by > DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. > While we want to polish the code to ensure these sort of race conditions > become harder (or impossible) to introduce, it sounds safest to revert to the > previous behavior given the timelines regarding the 2.6 release. Jira > https://issues.apache.org/jira/browse/KAFKA-10302 tracks further > modifications to the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski opened a new pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski opened a new pull request #9065: URL: https://github.com/apache/kafka/pull/9065 We would previously update the map by adding the new replicas to the map and then removing the old ones. During a recent refactoring, we changed the logic to first clear the map and then add all the replicas to it. While this is done in a write lock, not all callers that access the map structure use a lock. It is safer to revert to the previous behavior of showing the intermediate state of the map with extra replicas, rather than an intermediate state of the map with no replicas. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663065197 cc @ijuma This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459527097 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { -remoteReplicasMap.clear() +val replicaSet = assignment.toSet +val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - +removedReplicas.foreach(remoteReplicasMap.remove) Review comment: I decided to not get fancy with refactorings - this is literally the old code (https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10301) Partition#remoteReplicasMap can be empty in certain race conditions
[ https://issues.apache.org/jira/browse/KAFKA-10301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163686#comment-17163686 ] Stanislav Kozlovski commented on KAFKA-10301: - cc [~rhauch] - this would be good to get in 2.6 > Partition#remoteReplicasMap can be empty in certain race conditions > --- > > Key: KAFKA-10301 > URL: https://issues.apache.org/jira/browse/KAFKA-10301 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Blocker > > In Partition#updateAssignmentAndIsr, we would previously update the > `partition#remoteReplicasMap` by adding the new replicas to the map and then > removing the old ones > ([source]([https://github.com/apache/kafka/blob/7f9187fe399f3f6b041ca302bede2b3e780491e7/core/src/main/scala/kafka/cluster/Partition.scala#L657)] > During a recent refactoring, we changed it to first clear the map and then > add all the replicas to it > ([source]([https://github.com/apache/kafka/blob/2.6/core/src/main/scala/kafka/cluster/Partition.scala#L663])) > While this is done in a write lock (`inWriteLock(leaderIsrUpdateLock)`), not > all callers that access the map structure use a lock. Some examples: > - Partition#updateFollowerFetchState > - DelayedDeleteRecords#tryComplete > - Partition#getReplicaOrException - called in > `checkEnoughReplicasReachOffset` without a lock, which itself is called by > DelayedProduce. I think this can fail a `ReplicaManager#appendRecords` call. > While we want to polish the code to ensure these sort of race conditions > become harder (or impossible) to introduce, it sounds safest to revert to the > previous behavior given the timelines regarding the 2.6 release. Jira > https://issues.apache.org/jira/browse/KAFKA-10302 tracks further > modifications to the code. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] AshishRoyJava commented on pull request #9034: KAFKA-10246 : AbstractProcessorContext topic() throws NPE
AshishRoyJava commented on pull request #9034: URL: https://github.com/apache/kafka/pull/9034#issuecomment-663066844 @abbccdda Unit test added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663069630 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] asdaraujo commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
asdaraujo commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r459536318 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ## @@ -132,7 +132,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) -.filter(x -> x.downstreamOffset() > 0) // ignore offsets we cannot translate accurately +.filter(x -> x.downstreamOffset() >= 0) // ignore offsets we cannot translate accurately Review comment: @ryannedolan @heritamas If you guys think this is safe enough I can remove that filter. But it doesn't hurt to leave it there just in case a rogue negative offset comes through for whatever reason/bug... Please let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10303) kafka producer says connect failed in cluster mode
Yogesh BG created KAFKA-10303: - Summary: kafka producer says connect failed in cluster mode Key: KAFKA-10303 URL: https://issues.apache.org/jira/browse/KAFKA-10303 Project: Kafka Issue Type: Bug Reporter: Yogesh BG Hi I am using kafka broker version 2.3.0 We have two setups with standalone(one node) and 3 nodes cluster we pump huge data ~25MBPS, ~80K messages per second It all works well in one node mode but in case of cluster, producer start throwing connect failed(librd kafka) after sometime again able to connect start sending traffic. What could be the issue? some of the configurations are replica.fetch.max.bytes=10485760 num.network.threads=12 num.replica.fetchers=6 queued.max.requests=5 # The number of threads doing disk I/O num.io.threads=12 replica.socket.receive.buffer.bytes=1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663088134 Currently working on introducing a test case for this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()
mimaison commented on a change in pull request #9007: URL: https://github.com/apache/kafka/pull/9007#discussion_r459540733 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -2306,6 +2310,22 @@ void handleFailure(Throwable throwable) { return new DescribeLogDirsResult(new HashMap<>(futures)); } +private Map logDirDescriptions(DescribeLogDirsResponse response) { Review comment: This can be static. Also should we keep it in `DescribeLogDirsResponse`? ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception { } } +private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { +return prepareDescribeLogDirsResponse(error, logDir, +prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false)); +} + +private List prepareDescribeLogDirsTopics( +long partitionSize, long offsetLag, String topic, int partition, boolean isFuture) { +return singletonList(new DescribeLogDirsTopic() +.setName(topic) +.setPartitions(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsPartition() +.setPartitionIndex(partition) +.setPartitionSize(partitionSize) +.setIsFutureKey(isFuture) +.setOffsetLag(offsetLag; +} + +private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, + List topics) { +return new DescribeLogDirsResponse( +new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() +.setErrorCode(error.code()) +.setLogDir(logDir) +.setTopics(topics) +))); +} + +@Test +public void testDescribeLogDirs() throws ExecutionException, InterruptedException { +Set brokers = Collections.singleton(0); +String logDir = "/var/data/kafka"; +TopicPartition tp = new TopicPartition("topic", 12); +long partitionSize = 1234567890; +long offsetLag = 24; + +try (AdminClientUnitTestEnv env = mockClientEnv()) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); +env.kafkaClient().prepareResponseFrom( +prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, partitionSize, offsetLag), +env.cluster().nodeById(0)); + +DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + +Map>> descriptions = result.descriptions(); +assertEquals(brokers, descriptions.keySet()); +assertNotNull(descriptions.get(0)); +assertDescriptionContains(descriptions.get(0).get(), logDir, tp, partitionSize, offsetLag); + +Map> allDescriptions = result.allDescriptions().get(); +assertEquals(brokers, allDescriptions.keySet()); +assertDescriptionContains(allDescriptions.get(0), logDir, tp, partitionSize, offsetLag); +} +} + +private void assertDescriptionContains(Map descriptionsMap, String logDir, Review comment: This can be static ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception { } } +private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { Review comment: This can be static ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception { } } +private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { +return prepareDescribeLogDirsResponse(error, logDir, +prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false)); +} + +private List prepareDescribeLogDirsTopics( Review comment: This can be static ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -1057,6 +1059,263 @@ public void testDescribeConfigsUnrequested() throws Exception { } } +private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors
[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459563205 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { -remoteReplicasMap.clear() +val replicaSet = assignment.toSet +val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - +removedReplicas.foreach(remoteReplicasMap.remove) Review comment: Would `remoteReplicasMap --= removedReplicas` work here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
mimaison commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-663094946 While not part of your changes, I noticed the tests assumptions are pretty loose. For example, we assume https://github.com/apache/kafka/pull/9029/files#diff-a03d58195cfe119d0b1ed2693cd0d691L362 always consume all the 100 messages. The test also assumes there are no duplicates. While this may be fine when running in memory, Connect semantics are at least once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
mjsax commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r459577643 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); -int remainingRetries = retries; +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; Review comment: Good question. Default `max.poll.interval.ms` is 5 minutes (ie, the deadline is set to 2.5 minutes by default) while default `api.default.timeout.ms` is 1 minutes? Thus we might be ok? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
[ https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163747#comment-17163747 ] Matthias J. Sax commented on KAFKA-10255: - Two more: * [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1630/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/] * [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3480/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/] > Fix flaky testOneWayReplicationWithAutorOffsetSync1 test > > > Key: KAFKA-10255 > URL: https://issues.apache.org/jira/browse/KAFKA-10255 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0 > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 STARTED > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 FAILED > java.lang.AssertionError: consumer record size is not zero expected:<0> but > was:<2> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163749#comment-17163749 ] Matthias J. Sax commented on KAFKA-9013: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7491/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/] > Flaky Test MirrorConnectorsIntegrationTest#testReplication > -- > > Key: KAFKA-9013 > URL: https://issues.apache.org/jira/browse/KAFKA-9013 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Bruno Cadonna >Priority: Major > Labels: flaky-test > > h1. Stacktrace: > {code:java} > java.lang.AssertionError: Condition not met within timeout 2. Offsets not > translated downstream to primary cluster. > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239) > {code} > h1. Standard Error > {code} > Standard Error > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.RootResource registered in > SERVER runtime does not implement any provider interfaces applicable in the > SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. > Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors > WARNING: The following warnings have been detected: WARNING: The > (sub)resource method listLoggers in > org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectors in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method createConnector in > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains > empty path annotation. > WARNING: The (sub)resource method listConnectorPlugins in > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > contains empty path annotation. > WARNING: The (sub)resource method serverInfo in > org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty > path annotation. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource > registered in SERVER runtime does not implement any provider interfaces > applicable in the SERVER runtime. Due to constraint configuration problems > the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will > be ignored. > Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers > checkProviderRuntime > WARNING: A provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered > in SERVER runtime does not implement any provider interfaces applicable in > the SERVER runtime. Due to constraint configuration problems the provider > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be > ignored. > Oct 09, 2019 11:32:02 PM org.gla
[GitHub] [kafka] mjsax commented on pull request #9052: MINOR: TopologyTestDriver should not require dummy parameters
mjsax commented on pull request #9052: URL: https://github.com/apache/kafka/pull/9052#issuecomment-663107263 Only flaky tests failed. I updated the corresponding Jira tickets. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-663109443 @chia7712: Only 6 test failures in the latest run with your PR. http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-23--001.1595503536--chia7712--fix_8334_avoid_deadlock--3462b0008/report.html I will do another run on trunk for comparison. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
dajac commented on a change in pull request #9060: URL: https://github.com/apache/kafka/pull/9060#discussion_r459582710 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -96,13 +104,15 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams // have existed with the expected number of partitions, or some create topic returns fatal errors. log.debug("Starting to validate internal topics {} in partition assignor.", topics); -int remainingRetries = retries; +long currentWallClockMs = time.milliseconds(); +final long deadlineMs = currentWallClockMs + retryTimeoutMs; Review comment: That's right. I misread the default value of `max.poll.interval.ms`, too many zeros for my eyes ;). The default works fine then. Do we want to protect ourselves if the user changes the default? Or shall we just call out that `api.default.timeout.ms` should be lower than `max.poll.interval.ms` somewhere? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 edited a comment on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-662810186 > Could you rebase again? I will run the system tests after that. Thanks. done. the known flaky ```group_mode_transactions_test.py``` is traced by #9059 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies
mjsax commented on a change in pull request #9064: URL: https://github.com/apache/kafka/pull/9064#discussion_r459581717 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ## @@ -49,6 +49,9 @@ /** * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology. * + * It is a requirement that the processing logic (Topology) be defined in a deterministic way. Review comment: Nit: insert `` tag to actually get the new paragraph rendered. Nit: `Topology -> `{@link Topology}` It's not really clear what "deterministic" means. We should elaborate more. ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ## @@ -49,6 +49,9 @@ /** * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology. * + * It is a requirement that the processing logic (Topology) be defined in a deterministic way. + * If different instances build different runtime code logic the resulting behavior may be unexpected. Review comment: "different" for sure, but this implies that one might have an operator the other does not. The observed issue is, that even if both contain the same operator, they might be added in different order (and thus be named differently) to the `Topology`, thus we should stretch that order matters. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -1984,6 +1985,24 @@ public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() { task.initializeStateStores(); } +@Test(expected = TopologyException.class) Review comment: We should not use this annotation but rather use `assertThrows` (we still have some code that does not use `assertThrows` but we try to lazily migrate our tests, as it provides a better test pattern). ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -181,8 +182,16 @@ public StreamTask(final TaskId id, final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor(); final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler(); +final Set sourceTopics = topology.sourceTopics(); for (final TopicPartition partition : partitions) { -final SourceNode source = topology.source(partition.topic()); +final String topicName = partition.topic(); +if (!sourceTopics.contains(topicName)) { +throw new TopologyException( +"Topic not found " + topicName + ". Is the Streams Topology built in a deterministic way?" Review comment: `Topic not found` sounds like as-if the topic was not found in the cluster -- however, what actually happened is that we received a record but the record's topic is unknown in the sub-topology. Similar to above, "deterministic" is not really easy to understand. I would also not phrase it as a question, but as a statement: ``` ... This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order. ``` Or similar. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163759#comment-17163759 ] Guozhang Wang commented on KAFKA-10134: --- [~zhowei] Did your run include both the log4j improvement and the other PR depending on fetchable partitions to do long polling? > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies
mjsax commented on pull request #9064: URL: https://github.com/apache/kafka/pull/9064#issuecomment-663113463 Btw: The PR should be against `trunk`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459588501 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { -remoteReplicasMap.clear() +val replicaSet = assignment.toSet +val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - +removedReplicas.foreach(remoteReplicasMap.remove) Review comment: Oh, this is a `Pool`, so we would have to add a `removeAll` method. Seems easy enough though since it can call the relevant method in `ConcurrentMap`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163761#comment-17163761 ] Guozhang Wang commented on KAFKA-10134: --- BTW I found that the main latency during rebalance is on discovering the coordinator while we keep getting "Join group failed with org.apache.kafka.common.errors.DisconnectException" and it kept retrying for about a minute. But I think you did not shutdown the broker in your experiment, is there anything else happening that cause that broker node to be not reachable? > High CPU issue during rebalance in Kafka consumer after upgrading to 2.5 > > > Key: KAFKA-10134 > URL: https://issues.apache.org/jira/browse/KAFKA-10134 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Sean Guo >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 2.6.0, 2.5.1 > > Attachments: consumer5.log.2020-07-22.log > > > We want to utilize the new rebalance protocol to mitigate the stop-the-world > effect during the rebalance as our tasks are long running task. > But after the upgrade when we try to kill an instance to let rebalance happen > when there is some load(some are long running tasks >30S) there, the CPU will > go sky-high. It reads ~700% in our metrics so there should be several threads > are in a tight loop. We have several consumer threads consuming from > different partitions during the rebalance. This is reproducible in both the > new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The > difference is that with old eager rebalance rebalance protocol used the high > CPU usage will dropped after the rebalance done. But when using cooperative > one, it seems the consumers threads are stuck on something and couldn't > finish the rebalance so the high CPU usage won't drop until we stopped our > load. Also a small load without long running task also won't cause continuous > high CPU usage as the rebalance can finish in that case. > > "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 > cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable > [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 > os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 > runnable [0x7fe119aab000] java.lang.Thread.State: RUNNABLE at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) > at > > By debugging into the code we found it looks like the clients are in a loop > on finding the coordinator. > I also tried the old rebalance protocol for the new version the issue still > exists but the CPU will be back to normal when the rebalance is done. > Also tried the same on the 2.4.1 which seems don't have this issue. So it > seems related something changed between 2.4.1 and 2.5.0. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459596274 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { -remoteReplicasMap.clear() +val replicaSet = assignment.toSet +val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - +removedReplicas.foreach(remoteReplicasMap.remove) Review comment: `remoteReplicasMap --= removedReplicas` doesn't compile - the `remoteReplicasMap` is using a Kafka `Pool` class which itself is using a Java Map and I don't think they support the `--=` notation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress
[ https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-8582: -- Assignee: Igor Piddubnyi > Consider adding an ExpiredWindowRecordHandler to Suppress > - > > Key: KAFKA-8582 > URL: https://issues.apache.org/jira/browse/KAFKA-8582 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Igor Piddubnyi >Priority: Major > > I got some feedback on Suppress: > {quote}Specifying how to handle events outside the grace period does seem > like a business concern, and simply discarding them thus seems risky (for > example imagine any situation where money is involved). > This sort of situation is addressed by the late-triggering approach > associated with watermarks > (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given > this I wondered if you were considering adding anything similar?{quote} > It seems like, if a record has arrived past the grace period for its window, > then the state of the windowed aggregation would already have been lost, so > if we were to compute an aggregation result, it would be incorrect. Plus, > since the window is already expired, we can't store the new (incorrect, but > more importantly expired) aggregation result either, so any subsequent > super-late records would also face the same blank-slate. I think this would > wind up looking like this: if you have three timely records for a window, and > then three more that arrive after the grace period, and you were doing a > count aggregation, you'd see the counts emitted for the window as [1, 2, 3, > 1, 1, 1]. I guess we could add a flag to the post-expiration results to > indicate that they're broken, but this seems like the wrong approach. The > post-expiration aggregation _results_ are meaningless, but I could see > wanting to send the past-expiration _input records_ to a dead-letter queue or > something instead of dropping them. > Along this line of thinking, I wonder if we should add an optional > past-expiration record handler interface to the suppression operator. Then, > you could define your own logic, whether it's a dead-letter queue, sending it > to some alerting pipeline, or even just crashing the application before it > can do something wrong. This would be a similar pattern to how we allow > custom logic to handle deserialization errors by supplying a > org.apache.kafka.streams.errors.DeserializationExceptionHandler. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8582) Consider adding an ExpiredWindowRecordHandler to Suppress
[ https://issues.apache.org/jira/browse/KAFKA-8582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163770#comment-17163770 ] Matthias J. Sax commented on KAFKA-8582: Added you to the list of contributors and assigned the ticket to you. You can now also self-assign tickets. > Consider adding an ExpiredWindowRecordHandler to Suppress > - > > Key: KAFKA-8582 > URL: https://issues.apache.org/jira/browse/KAFKA-8582 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Igor Piddubnyi >Priority: Major > > I got some feedback on Suppress: > {quote}Specifying how to handle events outside the grace period does seem > like a business concern, and simply discarding them thus seems risky (for > example imagine any situation where money is involved). > This sort of situation is addressed by the late-triggering approach > associated with watermarks > (https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102), given > this I wondered if you were considering adding anything similar?{quote} > It seems like, if a record has arrived past the grace period for its window, > then the state of the windowed aggregation would already have been lost, so > if we were to compute an aggregation result, it would be incorrect. Plus, > since the window is already expired, we can't store the new (incorrect, but > more importantly expired) aggregation result either, so any subsequent > super-late records would also face the same blank-slate. I think this would > wind up looking like this: if you have three timely records for a window, and > then three more that arrive after the grace period, and you were doing a > count aggregation, you'd see the counts emitted for the window as [1, 2, 3, > 1, 1, 1]. I guess we could add a flag to the post-expiration results to > indicate that they're broken, but this seems like the wrong approach. The > post-expiration aggregation _results_ are meaningless, but I could see > wanting to send the past-expiration _input records_ to a dead-letter queue or > something instead of dropping them. > Along this line of thinking, I wonder if we should add an optional > past-expiration record handler interface to the suppression operator. Then, > you could define your own logic, whether it's a dead-letter queue, sending it > to some alerting pipeline, or even just crashing the application before it > can do something wrong. This would be a similar pattern to how we allow > custom logic to handle deserialization errors by supplying a > org.apache.kafka.streams.errors.DeserializationExceptionHandler. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9017: KAFKA-8582 [WIP] Add ability to handle late messages in streams-aggregation
mjsax commented on pull request #9017: URL: https://github.com/apache/kafka/pull/9017#issuecomment-663122659 I found your wiki account and granted write access. You should be all set. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459598439 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { Review comment: This fails incredibly quickly 100/100 times without the Partition.scala changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459599939 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { -remoteReplicasMap.clear() +val replicaSet = assignment.toSet +val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - +removedReplicas.foreach(remoteReplicasMap.remove) Review comment: Sounds good to introduce the method! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163796#comment-17163796 ] Sophie Blee-Goldman commented on KAFKA-10284: - You know, I think we actually hit this too, but weren't able to recognize the problem at the time. A few weeks ago one of our StreamThreads/Consumers seemed to "take off" from the group at some point, as evidenced by the steadily increasing last-rebalance-seconds-ago metric (whereas the other members had rebalanced multiple times since then). Right before this occurred we saw that same error message in the logs: {code:java} ERROR given member.id X is identified as a known static member 1,but not matching the expected member.id Y (kafka.coordinator.group.GroupMetadata) {code} Unfortunately we killed the client trying to remotely debug it so we couldn't get any more useful information. Would you say that this was mysterious encounter was likely due to the bug reported here? [~guozhang] [~bchen225242] > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on pull request #9065: URL: https://github.com/apache/kafka/pull/9065#issuecomment-663130390 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459607476 ## File path: core/src/test/scala/unit/kafka/utils/PoolTest.scala ## @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.utils Review comment: Remove `unit`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10000) Atomic commit of source connector records and offsets
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163802#comment-17163802 ] Ning Zhang commented on KAFKA-1: Hi Chris, the purpose of this ticket is very interesting. I wonder what is the priority in the overall Kafka Connect backlog, or how is the progress so far (needs-KIP)? Thanks > Atomic commit of source connector records and offsets > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Labels: needs-kip > > It'd be nice to be able to configure source connectors such that their > offsets are committed if and only if all records up to that point have been > ack'd by the producer. This would go a long way towards EOS for source > connectors. > > This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is > marked as {{WONTFIX}} since it only concerns enabling the idempotent producer > for source connectors and is not concerned with source connector offsets. > This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, > which had a lot of discussion around allowing connector-defined transaction > boundaries. The suggestion in this ticket is to only use source connector > offset commits as the transaction boundaries for connectors; allowing > connector-specified transaction boundaries can be addressed separately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
mjsax commented on pull request #9060: URL: https://github.com/apache/kafka/pull/9060#issuecomment-663134416 Only the `StreamsStandbyTask.test_standby_tasks_rebalance` system test failed and it's know to be buggy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7540) Flaky Test ConsumerBounceTest#testClose
[ https://issues.apache.org/jira/browse/KAFKA-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163803#comment-17163803 ] Matthias J. Sax commented on KAFKA-7540: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3496/testReport/junit/kafka.api/ConsumerBounceTest/testClose/] {quote}java.lang.AssertionError: Assignment did not complete on time at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.assertTrue(Assert.java:42) at kafka.api.ConsumerBounceTest.checkClosedState(ConsumerBounceTest.scala:486) at kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:257) at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:220){quote} > Flaky Test ConsumerBounceTest#testClose > --- > > Key: KAFKA-7540 > URL: https://issues.apache.org/jira/browse/KAFKA-7540 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 2.2.0 >Reporter: John Roesler >Assignee: Jason Gustafson >Priority: Critical > Labels: flaky-test > Fix For: 2.7.0, 2.6.1 > > > Observed on Java 8: > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/17314/testReport/junit/kafka.api/ConsumerBounceTest/testClose/] > > Stacktrace: > {noformat} > java.lang.ArrayIndexOutOfBoundsException: -1 > at > kafka.integration.KafkaServerTestHarness.killBroker(KafkaServerTestHarness.scala:146) > at > kafka.api.ConsumerBounceTest.checkCloseWithCoordinatorFailure(ConsumerBounceTest.scala:238) > at kafka.api.ConsumerBounceTest.testClose(ConsumerBounceTest.scala:211) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.
[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
[ https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163804#comment-17163804 ] Matthias J. Sax commented on KAFKA-10255: - And one more: [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1645/] > Fix flaky testOneWayReplicationWithAutorOffsetSync1 test > > > Key: KAFKA-10255 > URL: https://issues.apache.org/jira/browse/KAFKA-10255 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0 > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 STARTED > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 FAILED > java.lang.AssertionError: consumer record size is not zero expected:<0> but > was:<2> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on pull request #9060: KAFKA-9274: Remove `retries` from InternalTopicManager
mjsax commented on pull request #9060: URL: https://github.com/apache/kafka/pull/9060#issuecomment-663135369 Jenkins failed on know flaky tests only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on pull request #9047: URL: https://github.com/apache/kafka/pull/9047#issuecomment-663136110 Flaky tests only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
[ https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163807#comment-17163807 ] Matthias J. Sax commented on KAFKA-10255: - [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3497/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testOneWayReplicationWithAutorOffsetSync1/] > Fix flaky testOneWayReplicationWithAutorOffsetSync1 test > > > Key: KAFKA-10255 > URL: https://issues.apache.org/jira/browse/KAFKA-10255 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0 > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 STARTED > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 FAILED > java.lang.AssertionError: consumer record size is not zero expected:<0> but > was:<2> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] soondenana commented on a change in pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log
soondenana commented on a change in pull request #9054: URL: https://github.com/apache/kafka/pull/9054#discussion_r459603473 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -1151,6 +1144,12 @@ class LogManager(logDirs: Seq[File], } } } + + private def removeLogAndMetrics(logs: Pool[TopicPartition, Log], tp: TopicPartition): Log = { +val removedLog = logs.remove(tp) +if (removedLog != null) removedLog.removeLogMetrics() +removedLog Review comment: nit: Lets return Option(removedLog) to ease null checking by clients. Seems like the same object gets returned by `asyncDelete` but is only used in one place in test code, so many want to change the return value of that too. The less "null" we have the better. ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -199,27 +199,22 @@ class LogManager(logDirs: Seq[File], if (cleaner != null) cleaner.handleLogDirFailure(dir) - val offlineCurrentTopicPartitions = currentLogs.collect { -case (tp, log) if log.parentDir == dir => tp - } - offlineCurrentTopicPartitions.foreach { topicPartition => { -val removedLog = currentLogs.remove(topicPartition) -if (removedLog != null) { - removedLog.closeHandlers() - removedLog.removeLogMetrics() + def removeOfflineLogs(logs: Pool[TopicPartition, Log]): Iterable[TopicPartition] = { Review comment: Thanks for deduping this code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
ijuma commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459616154 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { +val active = new AtomicBoolean(true) +val replicaToCheck = 3 +val firstReplicaSet = Seq[Integer](3, 4, 5).asJava +val secondReplicaSet = Seq[Integer](1, 2, 3).asJava +def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() + .setControllerEpoch(1) + .setLeader(replicas.get(0)) + .setLeaderEpoch(1) + .setIsr(replicas) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true) +val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) +// Update replica set synchronously first to avoid race conditions +partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints) +assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + +var i = 0 Review comment: Shouldn't this be inside the thread state? ## File path: core/src/main/scala/kafka/utils/Pool.scala ## @@ -69,6 +69,8 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { def remove(key: K, value: V): Boolean = pool.remove(key, value) + def removeAll(keys: Iterable[K]): Unit = pool.keySet().removeAll(keys.asJavaCollection) Review comment: Nit: `()` is not needed. ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { +val active = new AtomicBoolean(true) +val replicaToCheck = 3 +val firstReplicaSet = Seq[Integer](3, 4, 5).asJava +val secondReplicaSet = Seq[Integer](1, 2, 3).asJava +def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() Review comment: No need to repeat `LeaderAndIsrPartitionState` twice. ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { +val active = new AtomicBoolean(true) +val replicaToCheck = 3 +val firstReplicaSet = Seq[Integer](3, 4, 5).asJava +val secondReplicaSet = Seq[Integer](1, 2, 3).asJava +def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() + .setControllerEpoch(1) + .setLeader(replicas.get(0)) + .setLeaderEpoch(1) + .setIsr(replicas) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true) +val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) +// Update replica set synchronously first to avoid race conditions +partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints) +assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + +var i = 0 +val future = executorService.submit((() => { + // Flip assignment between two replica sets + while (active.get) { +val replicas = if (i % 2 == 0) { + firstReplicaSet +} else { + secondReplicaSet +} + +partition.makeLeader(partitionState(replicas), offsetCheckpoints) + +i += 1 +Thread.sleep(1) // just to avoid tight loop + } +}): Runnable) + +val deadline = 5.seconds.fromNow +while(deadline.hasTimeLeft()) { Review comment: Nit: space missing after `while`. ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
ning2008wisc commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-663147296 totally agree that it may be better to revisit the tests in MM2 and I created a ticket https://issues.apache.org/jira/browse/KAFKA-10304 and assign it to myself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10304) Revisit and improve the tests of MirrorMaker 2
Ning Zhang created KAFKA-10304: -- Summary: Revisit and improve the tests of MirrorMaker 2 Key: KAFKA-10304 URL: https://issues.apache.org/jira/browse/KAFKA-10304 Project: Kafka Issue Type: Test Components: KafkaConnect Reporter: Ning Zhang Assignee: Ning Zhang due to the quick development of Kafka MM 2, unit and integration tests of MirrorMaker 2 were made just for covering each individual feature and some of them are simply copy-n-paste from the existing tests with small tweaks. It may be a good time to revisit and improve the tests, possibly in the following way: (1) are 100 messages good enough for integration tests? (2) what about the failure in the middle of integration tests? (3) Do we want to check other messages (e.g. checkpoint, heartbeat, offset sync..) beyond the mirrored message in integration tests? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459640134 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { +val active = new AtomicBoolean(true) +val replicaToCheck = 3 +val firstReplicaSet = Seq[Integer](3, 4, 5).asJava +val secondReplicaSet = Seq[Integer](1, 2, 3).asJava +def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() + .setControllerEpoch(1) + .setLeader(replicas.get(0)) + .setLeaderEpoch(1) + .setIsr(replicas) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true) +val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) +// Update replica set synchronously first to avoid race conditions +partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints) +assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + +var i = 0 Review comment: Yeah, nice catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459640854 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { +val active = new AtomicBoolean(true) +val replicaToCheck = 3 +val firstReplicaSet = Seq[Integer](3, 4, 5).asJava +val secondReplicaSet = Seq[Integer](1, 2, 3).asJava +def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() + .setControllerEpoch(1) + .setLeader(replicas.get(0)) + .setLeaderEpoch(1) + .setIsr(replicas) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true) +val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) +// Update replica set synchronously first to avoid race conditions +partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints) +assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + +var i = 0 +val future = executorService.submit((() => { + // Flip assignment between two replica sets + while (active.get) { +val replicas = if (i % 2 == 0) { + firstReplicaSet +} else { + secondReplicaSet +} + +partition.makeLeader(partitionState(replicas), offsetCheckpoints) + +i += 1 +Thread.sleep(1) // just to avoid tight loop + } +}): Runnable) + +val deadline = 5.seconds.fromNow Review comment: I think so. I opted for 5s as I saw the other tests had up to 15s of waits for futures. Let me see if 1s can go This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #9065: KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates
stanislavkozlovski commented on a change in pull request #9065: URL: https://github.com/apache/kafka/pull/9065#discussion_r459645411 ## File path: core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala ## @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { +val active = new AtomicBoolean(true) +val replicaToCheck = 3 +val firstReplicaSet = Seq[Integer](3, 4, 5).asJava +val secondReplicaSet = Seq[Integer](1, 2, 3).asJava +def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() + .setControllerEpoch(1) + .setLeader(replicas.get(0)) + .setLeaderEpoch(1) + .setIsr(replicas) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true) +val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) +// Update replica set synchronously first to avoid race conditions +partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints) +assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + +var i = 0 +val future = executorService.submit((() => { + // Flip assignment between two replica sets + while (active.get) { +val replicas = if (i % 2 == 0) { + firstReplicaSet +} else { + secondReplicaSet +} + +partition.makeLeader(partitionState(replicas), offsetCheckpoints) + +i += 1 +Thread.sleep(1) // just to avoid tight loop + } +}): Runnable) + +val deadline = 5.seconds.fromNow Review comment: Lowered to 1s This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a change in pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
jeffkbkim commented on a change in pull request #9050: URL: https://github.com/apache/kafka/pull/9050#discussion_r459647273 ## File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala ## @@ -598,6 +603,86 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { }, "Broker fail to initialize after restart") } + @Test + def testPreemptionOnControllerShutdown(): Unit = { +servers = makeServers(1, enableControlledShutdown = false) +val controller = getController().kafkaController +val count = new AtomicInteger(2) +val latch = new CountDownLatch(1) +val spyThread = spy(controller.eventManager.thread) +controller.eventManager.setControllerEventThread(spyThread) +val processedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = latch.await() + override def preempt(): Unit = {} +} +val preemptedEvent = new MockEvent(ControllerState.TopicChange) { + override def process(): Unit = {} + override def preempt(): Unit = count.decrementAndGet() +} + +controller.eventManager.put(processedEvent) +controller.eventManager.put(preemptedEvent) +controller.eventManager.put(preemptedEvent) + +doAnswer((_: InvocationOnMock) => { + latch.countDown() +}).doCallRealMethod().when(spyThread).awaitShutdown() Review comment: @stanislavkozlovski thanks for the comment. i've tried this approach before and the test passes but sometimes output: ``` [2020-07-23 11:12:43,316] ERROR [RequestSendThread controllerId=0] Controller 0 fails to send a request to broker localhost:51542 (id: 0 rack: null) (kafka.controller.RequestSendThread:76) java.lang.InterruptedException ``` i also see a bit of flakiness with this approach as we cannot exactly time when `latch.countDown()` is called. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8512: KAFKA-6024: Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()
abbccdda commented on a change in pull request #8512: URL: https://github.com/apache/kafka/pull/8512#discussion_r459648674 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ## @@ -1388,9 +1388,9 @@ public void commitSync() { */ @Override public void commitSync(Duration timeout) { +maybeThrowInvalidGroupIdException(); Review comment: If we throw here, we will not execute the current `finally` block to call `release()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163871#comment-17163871 ] Boyang Chen commented on KAFKA-10284: - [~akshaysh] I didn't see any trace that the group coordinator gets migrated in the pasted ticket, so it might be a separate issue. [~ableegoldman] Well, the symptom matches, but I don't know for sure if this is the same cause :) > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. > The bug find credit goes to [~hachikuji] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9161) Close gaps in Streams configs documentation
[ https://issues.apache.org/jira/browse/KAFKA-9161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9161: --- Fix Version/s: 2.6.0 > Close gaps in Streams configs documentation > --- > > Key: KAFKA-9161 > URL: https://issues.apache.org/jira/browse/KAFKA-9161 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Labels: beginner, newbie, newbie++ > Fix For: 2.6.0 > > > There are a number of Streams configs that aren't documented in the > "Configuring a Streams Application" section of the docs. As of 2.3 the > missing configs are: > # default.windowed.key.serde.inner ^ > # default.windowed.value.serde.inner ^ > # max.task.idle.ms > # rocksdb.config.setter. ^^ > # topology.optimization > # -upgrade.from- fixed > ^ these configs are also missing the corresponding DOC string > ^^ this one actually does appear on that page, but instead of being included > in the list of Streams configs it is for some reason under "Consumer and > Producer Configuration Parameters" ? > There are also a few configs whose documented name is slightly incorrect, as > it is missing the "default" prefix that appears in the actual code. The > "missing-default" configs are: > # key.serde > # value.serde > # timestamp.extractor -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna opened a new pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna opened a new pull request #9066: URL: https://github.com/apache/kafka/pull/9066 In PR #8962 we introduced a sentinel UNKNOWN_OFFSET to mark unknown offsets in checkpoint files. The sentinel was set to -2 which is the same value used for the sentinel LATEST_OFFSET that is used in subscriptions to signal that state stores have been used by an active task. Unfortunately, we missed to skip UNKNOWN_OFFSET when we compute the sum of the changelog offsets. If a task had only one state store and it did not restore anything before the next rebalance, the stream thread wrote -2 (i.e., UNKNOWN_OFFSET) into the subscription as sum of the changelog offsets. During assignment, the leader interpreted the -2 as if the stream run the task as active although it might have run it as standby. This misinterpretation of the sentinel value resulted in unexpected task assigments. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663197156 Started 10x runs of the failing system test here: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4074/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
cadonna commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663197372 Call for review: @ableegoldman @mjsax @vvcephei @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10287) fix flaky streams/streams_standby_replica_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163911#comment-17163911 ] Bruno Cadonna commented on KAFKA-10287: --- [~chia7712] Sorry, I was not aware that you assigned this ticket to yourself. Could you please review the PR? > fix flaky streams/streams_standby_replica_test.py > - > > Key: KAFKA-10287 > URL: https://issues.apache.org/jira/browse/KAFKA-10287 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > {quote} > Module: kafkatest.tests.streams.streams_standby_replica_test > Class: StreamsStandbyTask > Method: test_standby_tasks_rebalance > {quote} > It pass occasionally on my local. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei opened a new pull request #9067: MINOR: Streams integration tests should not call exit
vvcephei opened a new pull request #9067: URL: https://github.com/apache/kafka/pull/9067 Fixes this issue: ``` Execution failed for task ':streams:unitTest'. > Process 'Gradle Test Executor 69' finished with non-zero exit value 134 ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9067: MINOR: Streams integration tests should not call exit
vvcephei commented on pull request #9067: URL: https://github.com/apache/kafka/pull/9067#issuecomment-663223325 Hey @mjsax , can you review this? It's weird that the error is reported from `unitTest`, but the only call path to the `Exit.exit()` method is though this integration test. It may not completely fix the issue, but it doesn't hurt to put this in for now and see what happens. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez commented on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies
soarez commented on pull request #9064: URL: https://github.com/apache/kafka/pull/9064#issuecomment-663252849 Thanks for review @mjsax. I've changed the merge base to `trunk` and believe I have addressed all your feedback so far. Please take another look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez edited a comment on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies
soarez edited a comment on pull request #9064: URL: https://github.com/apache/kafka/pull/9064#issuecomment-663252849 Thanks for review @mjsax. I've changed the base branch to `trunk` and believe I have addressed all your feedback so far. Please take another look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9067: MINOR: Streams integration tests should not call exit
mjsax commented on pull request #9067: URL: https://github.com/apache/kafka/pull/9067#issuecomment-663255631 Is this the right fix? I thought we should replace `System.exit()` with `Exit.exit()` to address the isssu? -- We call `System.exit()` in multiple system tests. -- And we hav This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
mjsax commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663256239 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
mjsax commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-663256489 @cadonna Should the PR be against `trunk` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax edited a comment on pull request #9067: MINOR: Streams integration tests should not call exit
mjsax edited a comment on pull request #9067: URL: https://github.com/apache/kafka/pull/9067#issuecomment-663255631 Is this the right fix? I thought we should replace `System.exit()` with `Exit.exit()` to address the isssu? -- We call `System.exit()` in multiple system tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r459762279 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { Review comment: I can't comment above, but can the `poll` call itself throw a timeout exception? Or does it always just return no results in that case? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately Review comment: I think this makes sense, but it seems to make some assumptions about the internal implementation of the consumer that doesn't seem necessary here. Is it important to assert that the consumer can't possibly get a timeout exception here? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set changelogTopics = new HashSet<>(); -for (final StateStore stateStore : globalStateStores) { + +long deadlineMs = NO_DEADLINE; +final List storesToInitialize = new LinkedList<>(globalStateStores); + +while (!storesToInitialize.isEmpty()) { +// we remove and add back on failure to round-robin through all stores +final StateStore stateStore = storesToInitialize.remove(0); globalStoreNames.add(stateStore.name()); final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); -stateStore.init(globalProcessorContext, stateStore); + +try { +stateStore.init(globalProcessorContext, stateStore); +deadlineMs = NO_DEADLINE; +} catch (final RetryableErrorException retryableException) { +if (taskTimeoutMs == 0L) { +throw new StreamsException(retryableException.getCause()); Review comment: Why not just preserve the whole story of what happened, like `throw new StreamsException("Couldn't retry because timeout is set to zero", retryableException)`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, restoreRecords.add(recordConverter.convert(record)); } } -offset = globalConsumer.position(topicPartition); +try { +offset = globalConsumer.position(topicPartition); +} catch (final TimeoutException error) { +// the `globalConsumer.position()` call should never block, because we know that we did +// a successful `position()` call above for the requested partition and thus the consumer +// should have a valid local position that it can return immediately + +// hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` +throw new IllegalStateException(error); +} + stateRestoreAdapter.restoreBatch(restoreRecords); stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); restoreCount += restoreRecords.size(); Review comment: Huh, interesting thought. Just to be clear, it looks like it would already block forever in this case today, right? Yeah, it does seem like we should implement a similar non-progress
[jira] [Updated] (KAFKA-10287) fix flaky streams/streams_standby_replica_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10287: - Priority: Blocker (was: Major) > fix flaky streams/streams_standby_replica_test.py > - > > Key: KAFKA-10287 > URL: https://issues.apache.org/jira/browse/KAFKA-10287 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Blocker > > {quote} > Module: kafkatest.tests.streams.streams_standby_replica_test > Class: StreamsStandbyTask > Method: test_standby_tasks_rebalance > {quote} > It pass occasionally on my local. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10287) fix flaky streams/streams_standby_replica_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10287: - Fix Version/s: 2.6.0 > fix flaky streams/streams_standby_replica_test.py > - > > Key: KAFKA-10287 > URL: https://issues.apache.org/jira/browse/KAFKA-10287 > Project: Kafka > Issue Type: Bug > Components: streams, system tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 2.6.0 > > > {quote} > Module: kafkatest.tests.streams.streams_standby_replica_test > Class: StreamsStandbyTask > Method: test_standby_tasks_rebalance > {quote} > It pass occasionally on my local. -- This message was sent by Atlassian Jira (v8.3.4#803005)