[jira] [Updated] (KAFKA-7583) Unable to close kafka producer if kafka not avaibalbe
[ https://issues.apache.org/jira/browse/KAFKA-7583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vitalina Horyukova updated KAFKA-7583: -- Summary: Unable to close kafka producer if kafka not avaibalbe (was: Producer force close doesn't work when all brokers is down.) > Unable to close kafka producer if kafka not avaibalbe > - > > Key: KAFKA-7583 > URL: https://issues.apache.org/jira/browse/KAFKA-7583 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.11.0.0 > Environment: Linux xxx 3.13.0-139-generic #188-Ubuntu SMP Tue Jan 9 > 14:43:09 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux > java 1.8.0_152 >Reporter: Vitalina Horyukova >Priority: Major > > Hi! > When all Kafka brokers are down, thread which called {{KafkaProducer.close}} > the infinity stucks in second join to {{KafkaProducer.ioThread}}, because > {{KafkaProducer.ioThread}} infinity spins over {{while}} cycle in > {{Sender.maybeWaitForProducerId}}. The root cause of this is that > {{Sender.awaitLeastLoadedNodeReady}} -> {{NetworkClientUtils.awaitReady}} > throws {{IOException}} every iteration. > In logs you can see infinity repeation of this part every > {{retry.backoff.ms}}: > {code:java} > [2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | > producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: > -1 rack: null) > [2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | > producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: > -1 rack: null) > [2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | > producer-1] [] [] [] [NetworkClient] Initialize connection to node -1 for > sending metadata request > [2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | > producer-1] [] [] [] [NetworkClient] Initiating connection to node -1 at > kafka:9093. > [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | > producer-1] [] [] [] [Selector] Connection with kafka/xxx.xxx.xxx.xxx > disconnected > java.net.ConnectException: Connection refused > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:109) > at > org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:359) > at org.apache.kafka.common.network.Selector.poll(Selector.java:326) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432) > at > org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:39) > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:62) > at > org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409) > at > org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) > at java.lang.Thread.run(Thread.java:748) > [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | > producer-1] [] [] [] [NetworkClient] Node -1 disconnected. > [2018-11-01T16:19:47.585+03:00] WARN [kafka-producer-network-thread | > producer-1] [] [] [] [NetworkClient] Connection to node -1 could not be > established. Broker may not be available. > [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | > producer-1] [] [] [] [Sender] Broker {} disconnected while awaiting > InitProducerId response > java.io.IOException: Connection to kafka:9093 (id: -1 rack: null) failed. > at > org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68) > at > org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409) > at > org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) > at java.lang.Thread.run(Thread.java:748) > [2018-11-01T16:19:47.585+03:00] TRACE [kafka-producer-network-thread | > producer-1] [] [] [] [Sender] Retry InitProducerIdRequest in 100ms. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7575) 'Error while writing to checkpoint file' Issue
[ https://issues.apache.org/jira/browse/KAFKA-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672795#comment-16672795 ] Sönke Liebau commented on KAFKA-7575: - [~nirm2009] any news on this? If not, I'd be tempted to close this issue, as the error message points at this being caused by external influences. > 'Error while writing to checkpoint file' Issue > -- > > Key: KAFKA-7575 > URL: https://issues.apache.org/jira/browse/KAFKA-7575 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.1.1 > Environment: Windows 10, Kafka 1.1.1 >Reporter: Dasun Nirmitha >Priority: Blocker > Attachments: Dry run error.rar > > > I'm currently testing a Java Kafka producer application coded to retrieve a > db value from a local mysql db and produce to a single topic. Locally I've > got a Zookeeper server and a Kafka single broker running. > My issue is I need to produce this from the Kafka producer each second, and > that works for around 2 hours until broker throws an 'Error while writing to > checkpoint file' and shuts down. Producing with a 1 minute interval works > with no issues but unfortunately I need the produce interval to be 1 second. > I have attached a rar containing screenshots of the Errors thrown from the > Broker and my application. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
[ https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Liju reassigned KAFKA-5054: --- Assignee: Liju > ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized > > > Key: KAFKA-5054 > URL: https://issues.apache.org/jira/browse/KAFKA-5054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Damian Guy >Assignee: Liju >Priority: Critical > > {{putIfAbsent}} and {{delete}} should be synchronized as they involve at > least 2 operations on the underlying store and may result in inconsistent > results if someone were to query via IQ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7575) 'Error while writing to checkpoint file' Issue
[ https://issues.apache.org/jira/browse/KAFKA-7575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672832#comment-16672832 ] Dasun Nirmitha commented on KAFKA-7575: --- Hi Sonke, I ran the same application on an Ubuntu 12 VM and was able to successfully run it without errors for around 8 hours (at which time I manually stopped the VM). Later I also found on a Stackoverflow thread that this can be caused by the usage of Files.deleteIfExists(file.toPath) method used by Kafka which can cause incompatibility issues from Windows. [https://stackoverflow.com/questions/47168342/kafka-1-0-stops-with-fatal-shutdown-error-logs-directory-failed] https://issues.apache.org/jira/browse/KAFKA-6075 I believe this is still a bug which should be fixed on a future Kafka release. > 'Error while writing to checkpoint file' Issue > -- > > Key: KAFKA-7575 > URL: https://issues.apache.org/jira/browse/KAFKA-7575 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.1.1 > Environment: Windows 10, Kafka 1.1.1 >Reporter: Dasun Nirmitha >Priority: Blocker > Attachments: Dry run error.rar > > > I'm currently testing a Java Kafka producer application coded to retrieve a > db value from a local mysql db and produce to a single topic. Locally I've > got a Zookeeper server and a Kafka single broker running. > My issue is I need to produce this from the Kafka producer each second, and > that works for around 2 hours until broker throws an 'Error while writing to > checkpoint file' and shuts down. Producing with a 1 minute interval works > with no issues but unfortunately I need the produce interval to be 1 second. > I have attached a rar containing screenshots of the Errors thrown from the > Broker and my application. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7556) KafkaConsumer.beginningOffsets does not return actual first offsets
[ https://issues.apache.org/jira/browse/KAFKA-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672962#comment-16672962 ] Robert V commented on KAFKA-7556: - I didn't see this issue with a non-compacted topics, but that topic has a very long retention set so messages were never deleted. > KafkaConsumer.beginningOffsets does not return actual first offsets > --- > > Key: KAFKA-7556 > URL: https://issues.apache.org/jira/browse/KAFKA-7556 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 1.0.0 >Reporter: Robert V >Priority: Critical > Labels: documentation, usability > Fix For: 2.2.0 > > > h2. Description of the problem > The method `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets` > claims in its Javadoc documentation that it would 'Get the first offset for > the given partitions.'. > I used it with a compacted topic, and it always returned offset 0 for all > partitions. > Not sure if using a compacted topic actually matters, but I'm enclosing this > information anyway. > Given a Kafka topic with retention set, and old log files being removed as a > result of that, the effective start offset of those partitions move further; > it simply will be greater than offset 0. > However, calling the `beginningOffsets` method always returns offset 0 as the > first offset. > In contrast, when the method > `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes` is called > with a timestamp of 0L (UNIX epoch 1st Jan, 1970), it correctly returns the > effective start offsets for each partitions. > Output of using > `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets`: > {code:java} > {test.topic-87=0, test.topic-54=0, test.topic-21=0, test.topic-79=0, > test.topic-46=0, test.topic-13=0, test.topic-70=0, test.topic-37=0, > test.topic-12=0, test.topic-95=0, test.topic-62=0, test.topic-29=0, > test.topic-4=0, test.topic-88=0, test.topic-55=0, test.topic-22=0, > test.topic-80=0, test.topic-47=0, test.topic-14=0, test.topic-71=0, > test.topic-38=0, test.topic-5=0, test.topic-96=0, test.topic-63=0, > test.topic-30=0, test.topic-56=0, test.topic-23=0, test.topic-89=0, > test.topic-48=0, test.topic-15=0, test.topic-81=0, test.topic-72=0, > test.topic-39=0, test.topic-6=0, test.topic-64=0, test.topic-31=0, > test.topic-97=0, test.topic-24=0, test.topic-90=0, test.topic-57=0, > test.topic-16=0, test.topic-82=0, test.topic-49=0, test.topic-40=0, > test.topic-7=0, test.topic-73=0, test.topic-32=0, test.topic-98=0, > test.topic-65=0, test.topic-91=0, test.topic-58=0, test.topic-25=0, > test.topic-83=0, test.topic-50=0, test.topic-17=0, test.topic-8=0, > test.topic-74=0, test.topic-41=0, test.topic-0=0, test.topic-99=0, > test.topic-66=0, test.topic-33=0, test.topic-92=0, test.topic-59=0, > test.topic-26=0, test.topic-84=0, test.topic-51=0, test.topic-18=0, > test.topic-75=0, test.topic-42=0, test.topic-9=0, test.topic-67=0, > test.topic-34=0, test.topic-1=0, test.topic-85=0, test.topic-60=0, > test.topic-27=0, test.topic-77=0, test.topic-52=0, test.topic-19=0, > test.topic-76=0, test.topic-43=0, test.topic-10=0, test.topic-93=0, > test.topic-68=0, test.topic-35=0, test.topic-2=0, test.topic-86=0, > test.topic-53=0, test.topic-28=0, test.topic-78=0, test.topic-45=0, > test.topic-20=0, test.topic-69=0, test.topic-44=0, test.topic-11=0, > test.topic-94=0, test.topic-61=0, test.topic-36=0, test.topic-3=0} > {code} > Output of using > `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes`: > {code:java} > {test.topic-87=(timestamp=1511264434285, offset=289), > test.topic-54=(timestamp=1511265134993, offset=45420), > test.topic-21=(timestamp=1511265534207, offset=63643), > test.topic-79=(timestamp=1511270338275, offset=380750), > test.topic-46=(timestamp=1511266883588, offset=266379), > test.topic-13=(timestamp=1511265900538, offset=98512), > test.topic-70=(timestamp=1511266972452, offset=118522), > test.topic-37=(timestamp=1511264396370, offset=763), > test.topic-12=(timestamp=1511265504886, offset=61108), > test.topic-95=(timestamp=1511289492800, offset=847647), > test.topic-62=(timestamp=1511265831298, offset=68299), > test.topic-29=(timestamp=1511278767417, offset=548361), > test.topic-4=(timestamp=1511269316679, offset=144855), > test.topic-88=(timestamp=1511265608468, offset=107831), > test.topic-55=(timestamp=1511267449288, offset=129241), > test.topic-22=(timestamp=1511283134114, offset=563095), > test.topic-80=(timestamp=1511277334877, offset=534859), > test.topic-47=(timestamp=1511265530689, offset=71608), > test.topic-14=(timestamp=1511266308829, offset=80962), > test.topic-71=(timestamp=1511265474740, offset=83607), > test.
[jira] [Comment Edited] (KAFKA-7556) KafkaConsumer.beginningOffsets does not return actual first offsets
[ https://issues.apache.org/jira/browse/KAFKA-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672962#comment-16672962 ] Robert V edited comment on KAFKA-7556 at 11/2/18 11:23 AM: --- I didn't see this issue with a non-compacted topic, but that topic has a very long retention set so messages were never deleted. was (Author: rob_v): I didn't see this issue with a non-compacted topics, but that topic has a very long retention set so messages were never deleted. > KafkaConsumer.beginningOffsets does not return actual first offsets > --- > > Key: KAFKA-7556 > URL: https://issues.apache.org/jira/browse/KAFKA-7556 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 1.0.0 >Reporter: Robert V >Priority: Critical > Labels: documentation, usability > Fix For: 2.2.0 > > > h2. Description of the problem > The method `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets` > claims in its Javadoc documentation that it would 'Get the first offset for > the given partitions.'. > I used it with a compacted topic, and it always returned offset 0 for all > partitions. > Not sure if using a compacted topic actually matters, but I'm enclosing this > information anyway. > Given a Kafka topic with retention set, and old log files being removed as a > result of that, the effective start offset of those partitions move further; > it simply will be greater than offset 0. > However, calling the `beginningOffsets` method always returns offset 0 as the > first offset. > In contrast, when the method > `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes` is called > with a timestamp of 0L (UNIX epoch 1st Jan, 1970), it correctly returns the > effective start offsets for each partitions. > Output of using > `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets`: > {code:java} > {test.topic-87=0, test.topic-54=0, test.topic-21=0, test.topic-79=0, > test.topic-46=0, test.topic-13=0, test.topic-70=0, test.topic-37=0, > test.topic-12=0, test.topic-95=0, test.topic-62=0, test.topic-29=0, > test.topic-4=0, test.topic-88=0, test.topic-55=0, test.topic-22=0, > test.topic-80=0, test.topic-47=0, test.topic-14=0, test.topic-71=0, > test.topic-38=0, test.topic-5=0, test.topic-96=0, test.topic-63=0, > test.topic-30=0, test.topic-56=0, test.topic-23=0, test.topic-89=0, > test.topic-48=0, test.topic-15=0, test.topic-81=0, test.topic-72=0, > test.topic-39=0, test.topic-6=0, test.topic-64=0, test.topic-31=0, > test.topic-97=0, test.topic-24=0, test.topic-90=0, test.topic-57=0, > test.topic-16=0, test.topic-82=0, test.topic-49=0, test.topic-40=0, > test.topic-7=0, test.topic-73=0, test.topic-32=0, test.topic-98=0, > test.topic-65=0, test.topic-91=0, test.topic-58=0, test.topic-25=0, > test.topic-83=0, test.topic-50=0, test.topic-17=0, test.topic-8=0, > test.topic-74=0, test.topic-41=0, test.topic-0=0, test.topic-99=0, > test.topic-66=0, test.topic-33=0, test.topic-92=0, test.topic-59=0, > test.topic-26=0, test.topic-84=0, test.topic-51=0, test.topic-18=0, > test.topic-75=0, test.topic-42=0, test.topic-9=0, test.topic-67=0, > test.topic-34=0, test.topic-1=0, test.topic-85=0, test.topic-60=0, > test.topic-27=0, test.topic-77=0, test.topic-52=0, test.topic-19=0, > test.topic-76=0, test.topic-43=0, test.topic-10=0, test.topic-93=0, > test.topic-68=0, test.topic-35=0, test.topic-2=0, test.topic-86=0, > test.topic-53=0, test.topic-28=0, test.topic-78=0, test.topic-45=0, > test.topic-20=0, test.topic-69=0, test.topic-44=0, test.topic-11=0, > test.topic-94=0, test.topic-61=0, test.topic-36=0, test.topic-3=0} > {code} > Output of using > `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes`: > {code:java} > {test.topic-87=(timestamp=1511264434285, offset=289), > test.topic-54=(timestamp=1511265134993, offset=45420), > test.topic-21=(timestamp=1511265534207, offset=63643), > test.topic-79=(timestamp=1511270338275, offset=380750), > test.topic-46=(timestamp=1511266883588, offset=266379), > test.topic-13=(timestamp=1511265900538, offset=98512), > test.topic-70=(timestamp=1511266972452, offset=118522), > test.topic-37=(timestamp=1511264396370, offset=763), > test.topic-12=(timestamp=1511265504886, offset=61108), > test.topic-95=(timestamp=1511289492800, offset=847647), > test.topic-62=(timestamp=1511265831298, offset=68299), > test.topic-29=(timestamp=1511278767417, offset=548361), > test.topic-4=(timestamp=1511269316679, offset=144855), > test.topic-88=(timestamp=1511265608468, offset=107831), > test.topic-55=(timestamp=1511267449288, offset=129241), > test.topic-22=(timestamp=1511283134114, offset=563095), > test.topic-80=(timestamp=151127733
[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673137#comment-16673137 ] Daren Thomas commented on KAFKA-7577: - {quote}Can you verify that Y.leftJoin(Z) produces an output record that contains B ? {quote} I can try. It is correct that B comes from input Z. Table Y does not contribute anything to the final output. Its only purpose is to link Table X with Table Z. So the output of Y.leftJoin(Z) is a key capable of joining with X and the relevant information from Z. Does that make sense? {quote}Can you reproduce the issue using `TopologyTestDriver`? {quote} My apologies, but I'm not familiar with the TopologyTestDriver. If I can figure out how to use it, I will certainly pass that information along. > Semantics of Table-Table Join with Null Message Are Incorrect > - > > Key: KAFKA-7577 > URL: https://issues.apache.org/jira/browse/KAFKA-7577 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Daren Thomas >Priority: Major > > Observed behavior of Table-Table join with a Null Message does not match the > semantics described in the documentation > ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).] > The expectation is: > * Message A results in [A, null] from the Left Join > * Message null (tombstone) results in null (tombstone) from the Left Join > The observed behavior was that the null (tombstone) message did not pass > through the Left Join to the output topic like expected. This behavior was > observed with and without caching enabled, against a test harness, and > against a local Confluent 5.0.0 platform. It was also observed that the > KTableKTableLeftJoinProcessor.process() function was not called for the null > (tombstone) message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations
[ https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673371#comment-16673371 ] chris riley commented on KAFKA-7509: Just commenting to say these log warnings have been a thorn in my side for some time now. I was about to create a PR to do the same thing as [~rhauch] did. As a workaround I have set loglevel to ERROR in my logback for {noformat} org.apache.kafka.clients.admin.AdminClientConfig org.apache.kafka.clients.consumer.ConsumerConfig org.apache.kafka.clients.producer.ProducerConfig{noformat} But I would rather not have to do this. > Kafka Connect logs unnecessary warnings about unused configurations > --- > > Key: KAFKA-7509 > URL: https://issues.apache.org/jira/browse/KAFKA-7509 > Project: Kafka > Issue Type: Improvement > Components: clients, KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > > When running Connect, the logs contain quite a few warnings about "The > configuration '{}' was supplied but isn't a known config." This occurs when > Connect creates producers, consumers, and admin clients, because the > AbstractConfig is logging unused configuration properties upon construction. > It's complicated by the fact that the Producer, Consumer, and AdminClient all > create their own AbstractConfig instances within the constructor, so we can't > even call its {{ignore(String key)}} method. > See also KAFKA-6793 for a similar issue with Streams. > There are no arguments in the Producer, Consumer, or AdminClient constructors > to control whether the configs log these warnings, so a simpler workaround > is to only pass those configuration properties to the Producer, Consumer, and > AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig > configdefs know about. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user
[ https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673439#comment-16673439 ] Matthias J. Sax commented on KAFKA-7580: Unit test are not intended to fail if you run them as root use. I am fine with improving the test as you suggested. It's just seems to be a very uncommon request that unit test need to pass as root use... I would never run unit test as root use – same for any other piece of "regular" software. Running software as root should be limited as much as possible in general, and usually it's not required. > Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when > run as root user > -- > > Key: KAFKA-7580 > URL: https://issues.apache.org/jira/browse/KAFKA-7580 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Minor > > Created a non-root user and ran the following command to execute the failiing > unit test:- > ./gradlew streams:unitTest --tests > org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir > For a root user, the test case fails:- > = > > Task :streams:testClasses UP-TO-DATE > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED > java.lang.AssertionError: Expected exception: > org.apache.kafka.streams.errors.ProcessorStateException > 1 test completed, 1 failed > > Task :streams:unitTest FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':streams:unitTest'. > > There were failing tests. See the report at: > > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html > * Try: > Run with --stacktrace option to get the stack trace. Run with --info or > --debug option to get more log output. Run with --scan to get full insights. > * Get more help at https://help.gradle.org > BUILD FAILED in 20s > 26 actionable tasks: 2 executed, 24 up-to-date > = > However, for a non-root user the test cass passes as success:- > = > > Task :streams:testClasses > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED > BUILD SUCCESSFUL in 45s > 26 actionable tasks: 4 executed, 22 up-to-date > = > The failing unit test - > "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary > file directory and sets it as readOnly. The unit test is intended to throw an > exception - "ProcessorStateException", when the readOnly temporary file > directory is opened/accessed. > By default, non-root users opening/accessing readOnly file directory is not > allowed and it rightly throws up an error/exception in the unit test(which is > the intention of the unit test and it passes for non-root users). > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent > mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied > > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/ > ls: cannot access '/tmp/readOnlyDir/..': Permission denied > ls: cannot access '/tmp/readOnlyDir/.': Permission denied > ls: cannot access '/tmp/readOnlyDir/kid': Permission denied > ls: cannot access '/tmp/readOnlyDir/child': Permission denied > total 0 > d? ? ? ? ? ? ./ > d? ? ? ? ? ? ../ > d? ? ? ? ? ? child/ > d? ? ? ? ? ? kid/ > However, by default, root user can access any file in the system.:- > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 112 > dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > > root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent > > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 116 > dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/ > Hence the unit test does not throw an exception - "ProcessorStateException" > when the readOnly temporary file directory is opened, and the unit test > rightly fails for a root user. > Two approaches for resolving this failing unit test case:- > 1.) Run the unit tests as n
[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673444#comment-16673444 ] Matthias J. Sax commented on KAFKA-7577: TopologyTestDriver docs: [https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html] > Semantics of Table-Table Join with Null Message Are Incorrect > - > > Key: KAFKA-7577 > URL: https://issues.apache.org/jira/browse/KAFKA-7577 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Daren Thomas >Priority: Major > > Observed behavior of Table-Table join with a Null Message does not match the > semantics described in the documentation > ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).] > The expectation is: > * Message A results in [A, null] from the Left Join > * Message null (tombstone) results in null (tombstone) from the Left Join > The observed behavior was that the null (tombstone) message did not pass > through the Left Join to the output topic like expected. This behavior was > observed with and without caching enabled, against a test harness, and > against a local Confluent 5.0.0 platform. It was also observed that the > KTableKTableLeftJoinProcessor.process() function was not called for the null > (tombstone) message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7584) StreamsConfig throws ClassCastException if max.in.flight.request.per.connect is specified as String
Matthias J. Sax created KAFKA-7584: -- Summary: StreamsConfig throws ClassCastException if max.in.flight.request.per.connect is specified as String Key: KAFKA-7584 URL: https://issues.apache.org/jira/browse/KAFKA-7584 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.0.0 Reporter: Matthias J. Sax Assignee: Matthias J. Sax Setting {quote}{{props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);}} {{props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");}} {quote} results in {quote}{{java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer}}{{at org.apache.kafka.streams.StreamsConfig.checkIfUnexpectedUserSpecifiedConsumerConfig(StreamsConfig.java:875)}} {{ at org.apache.kafka.streams.StreamsConfig.getProducerConfigs(StreamsConfig.java:1071)}} {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7585) Avoid classloader when passing serializers to KafkaProducer constructors
Sherwin Chiu created KAFKA-7585: --- Summary: Avoid classloader when passing serializers to KafkaProducer constructors Key: KAFKA-7585 URL: https://issues.apache.org/jira/browse/KAFKA-7585 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 2.0.0 Reporter: Sherwin Chiu When using {quote}{{public KafkaProducer(Properties properties, Serializer keySerializer, Serializer valueSerializer)}} {quote} I was surprised to run into the following error. {quote}{{Class org.apache.kafka.common.serialization.StringSerializer could not be found.}} {quote} Is there a reason why KafkaProducer needs to reload the class I already passed in? I was expecting this constructor to function similarly to {quote}{{public KafkaProducer(Map configs, Serializer keySerializer, Serializer valueSerializer)}} {quote} Why not convert/cast the Properties to a Map and add the serializers just like the Map config constructor? I see that it's trying to keep the Properties construct. Is there a reason for this? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7584) StreamsConfig throws ClassCastException if max.in.flight.request.per.connect is specified as String
[ https://issues.apache.org/jira/browse/KAFKA-7584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673614#comment-16673614 ] ASF GitHub Bot commented on KAFKA-7584: --- mjsax opened a new pull request #5874: KAFKA-7584: StreamsConfig throws ClassCastException if max.in.flight.request.per.connect is specified as String URL: https://github.com/apache/kafka/pull/5874 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamsConfig throws ClassCastException if max.in.flight.request.per.connect > is specified as String > --- > > Key: KAFKA-7584 > URL: https://issues.apache.org/jira/browse/KAFKA-7584 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Minor > > Setting > {quote}{{props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);}} > {{props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");}} > {quote} > results in > {quote}{{java.lang.ClassCastException: java.lang.String cannot be cast to > java.lang.Integer}}{{at > org.apache.kafka.streams.StreamsConfig.checkIfUnexpectedUserSpecifiedConsumerConfig(StreamsConfig.java:875)}} > {{ at > org.apache.kafka.streams.StreamsConfig.getProducerConfigs(StreamsConfig.java:1071)}} > {quote} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7586) Connector status endpoint HTTP response codes
Rod Cordova created KAFKA-7586: -- Summary: Connector status endpoint HTTP response codes Key: KAFKA-7586 URL: https://issues.apache.org/jira/browse/KAFKA-7586 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 1.1.1 Reporter: Rod Cordova It appears that the HTTP response code for the status endpoint is always 200/OK. It would be extremely helpful if the response code was NOT 200/OK if there are FAILED connector states or task states. Having to parse the response body JSON in order to determine actual health adds unnecessary burden on the upstream client doing the health check. Based on the existing documentation you can see that even though there is a FAILED worker task the HTTP response code is 200/OK ([https://docs.confluent.io/current/connect/references/restapi.html#get--connectors-(string-name)-status]) We run these connectors in Docker containers and have resorted to Docker health checks (calling a simple script to parse the JSON status response body) but would much prefer to leverage the HTTP response code as that is the lowest common denominator supported by most proxies/load balancers etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction
[ https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673706#comment-16673706 ] Sebastian Puzoń commented on KAFKA-7531: Hi Jason, here's commit id from broker: _INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)_ FYI, the NPE issue may be related to session timeout discrepancy between broker/client, On the broker I've had 20s whereas on the client side it was 60s. I'm pretty sure this was also a reason of Streams thread leak on the application side that I could also observe, it was slow but eventually all streams threads in JVM have been shut down. I'm guessing ... maybe this leads to NPE issue describe in this ticket? Since I corrected session settings for broker/stream app I don't observe stream thread leak in client app, I didn't have time to re-test yet, I got NPE after few hours, I will try next week. > NPE NullPointerException at TransactionCoordinator handleEndTransaction > --- > > Key: KAFKA-7531 > URL: https://issues.apache.org/jira/browse/KAFKA-7531 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.0.0 >Reporter: Sebastian Puzoń >Priority: Critical > Fix For: 2.1.1, 2.0.2 > > > Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper. > Streams Application 4 instances, each has 5 Streams threads, total 20 stream > threads. > I observe NPE NullPointerException at coordinator broker which causes all > application stream threads shutdown, here's stack from broker: > {code:java} > [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member > elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe > in group elo > g_agg has failed, removing it from the group > (kafka.coordinator.group.GroupCoordinator) > [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance > group elog_agg with old generation 49 (__consumer_offsets-21) > (kafka.coordinator.gro > up.GroupCoordinator) > [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group > elog_agg generation 50 (__consumer_offsets-21) > (kafka.coordinator.group.GroupCoordinator) > [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from > leader for group elog_agg for generation 50 > (kafka.coordinator.group.GroupCoordina > tor) > [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized > transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on > partition _ > _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator) > [ > [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request > {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr > ue} (kafka.server.KafkaApis) > java.lang.NullPointerException > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383) > at scala.util.Either$RightProjection.flatMap(Either.scala:702) > at > kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) > at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110) > at > kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251
[jira] [Comment Edited] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction
[ https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673706#comment-16673706 ] Sebastian Puzoń edited comment on KAFKA-7531 at 11/2/18 9:32 PM: - Hi Jason, here's commit id from broker: _INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)_ FYI, the NPE issue may be related to session timeout discrepancy between broker/client, On the broker I've had 20s whereas on the client side it was 60s. I'm pretty sure this was also a reason of Streams thread leak on the application side that I could also observe, it was slow but eventually all streams threads in JVM have been shut down, I'm guessing ... maybe this leads to NPE issue describe in this ticket? Since I corrected session settings for broker/stream app I don't observe stream thread leak in client app nor NPE. I didn't have time to re-test yet, I will try next week. was (Author: spuzon): Hi Jason, here's commit id from broker: _INFO Kafka commitId : 3402a8361b734732 (org.apache.kafka.common.utils.AppInfoParser)_ FYI, the NPE issue may be related to session timeout discrepancy between broker/client, On the broker I've had 20s whereas on the client side it was 60s. I'm pretty sure this was also a reason of Streams thread leak on the application side that I could also observe, it was slow but eventually all streams threads in JVM have been shut down. I'm guessing ... maybe this leads to NPE issue describe in this ticket? Since I corrected session settings for broker/stream app I don't observe stream thread leak in client app, I didn't have time to re-test yet, I got NPE after few hours, I will try next week. > NPE NullPointerException at TransactionCoordinator handleEndTransaction > --- > > Key: KAFKA-7531 > URL: https://issues.apache.org/jira/browse/KAFKA-7531 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.0.0 >Reporter: Sebastian Puzoń >Priority: Critical > Fix For: 2.1.1, 2.0.2 > > > Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper. > Streams Application 4 instances, each has 5 Streams threads, total 20 stream > threads. > I observe NPE NullPointerException at coordinator broker which causes all > application stream threads shutdown, here's stack from broker: > {code:java} > [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member > elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe > in group elo > g_agg has failed, removing it from the group > (kafka.coordinator.group.GroupCoordinator) > [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance > group elog_agg with old generation 49 (__consumer_offsets-21) > (kafka.coordinator.gro > up.GroupCoordinator) > [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group > elog_agg generation 50 (__consumer_offsets-21) > (kafka.coordinator.group.GroupCoordinator) > [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from > leader for group elog_agg for generation 50 > (kafka.coordinator.group.GroupCoordina > tor) > [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized > transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on > partition _ > _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator) > [ > [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request > {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr > ue} (kafka.server.KafkaApis) > java.lang.NullPointerException > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383) > at scala.util.Either$RightProjection.flatMap(Either.scala:702) > at > kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
[jira] [Commented] (KAFKA-7576) Dynamic update of replica fetcher threads may fail to start/close fetchers
[ https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673801#comment-16673801 ] ASF GitHub Bot commented on KAFKA-7576: --- rajinisivaram opened a new pull request #5875: KAFKA-7576: Fix shutdown of replica fetcher threads URL: https://github.com/apache/kafka/pull/5875 ReplicaFetcherThread.shutdown attempts to close the fetcher's Selector while the thread is running. This in unsafe and can result in `Selector.close()` failing with an exception. The exception is caught and logged at debug level, but this can lead to socket leak if the shutdown is due to dynamic config update rather than broker shutdown. This PR changes the shutdown logic to close Selector after the replica fetcher thread is shutdown, with a wakeup() and flag to terminate blocking sends first. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Dynamic update of replica fetcher threads may fail to start/close fetchers > -- > > Key: KAFKA-7576 > URL: https://issues.apache.org/jira/browse/KAFKA-7576 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.1, 2.0.1, 2.1.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.2, 2.1.1, 2.0.2 > > > KAFKA-6051 moved ReplicaFetcherBlockingSend shutdown earlier in the shutdown > sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers > can now throw an exception because Selector may be closed on a different > thread while data is being written on another thread. KAFKA-7464 changed this > behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and > not propagated to avoid exceptions during broker shutdown. > When config update notification of `num.replica.fetchers` is processed, > partitions are migrated as necessary to increase or decrease the number of > fetcher threads. Existing fetchers are shutdown first before new ones are > created.This migration is performed on the thread processing ZK change > notification. The shutdown of Selector of existing fetchers is not safe since > replica fetcher thread may be processing data at the time using the same > Selector. > Without the fix from KAFKA-7464, another update of the config or broker > restart is required to restart the replica fetchers after dynamic config > update if shutdown encounters an exception. > Exception stack trace: > {code:java} > java.lang.IllegalArgumentException > at java.nio.Buffer.position(Buffer.java:244) > at sun.nio.ch.IOUtil.write(IOUtil.java:68) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210) > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160) > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70) > at org.apache.kafka.common.network.Selector.doClose(Selector.java:748) > at org.apache.kafka.common.network.Selector.close(Selector.java:736) > at org.apache.kafka.common.network.Selector.close(Selector.java:698) > at org.apache.kafka.common.network.Selector.close(Selector.java:314) > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533) > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90) > at > kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86) > at > kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76) > at > kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collection.mutable.HashM
[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations
[ https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673826#comment-16673826 ] ASF GitHub Bot commented on KAFKA-7509: --- rhauch opened a new pull request #5876: KAFKA-7509: Changed to avoid passing non-applicable properties to producer, consumer, and admin client URL: https://github.com/apache/kafka/pull/5876 The producer, consumer, and admin client log properties that are supplied but unused by the producer. Previously, Connect would pass many of its worker configuration properties into the producer, consumer, and admin client used for internal topics, resulting in lots of log warnings about unused config properties. With this change, Connect attempts to filter out the worker’s configuration properties that are not applicable to the producer, consumer, or admin client used for _internal_ topics. (Connect is already including only producer and consumer properties when creating those clients for connectors, since those properties are prefixed in the worker config.) For the most part, this is relatively straightforward, since there are some top-level worker-specific properties that can be removed, and most extension-specific properties have Connect-specific properties. Unfortunately, the REST extension is the only type of connect extension that uses unprefixed properties from the worker config, so any it is not possible to remove those from the properties passed to the producer, consumer, and admin clients. Hopefully, REST extensions are prevalant yet, and this will mean most users may not see any warnings about unused properties in the producer, consumer, and admin client. Removing the Connect worker configs is one step. The other is for the producer to remove the any properties that are specific to the consumer and admin client. Likewise, for the consumer we have to remove any properties that are specific to the producer and admin client, and for the admin client remove any properties that are specific to the producer and consumer. Note that any property that is unknown (e.g., properties for REST extension, interceptors, metric reporters, serdes, partitioners, etc.) must be passed to the producer, consumer, and admin client. All of these — except for the REST extension properties — should indeed be used by the producer and consumer. But, since the admin client only supports metric reporters, any properties for interceptors, serdes, partitioners and REST extension will also be logged as unused. This is about the best we can do at this point. All of this filtering logic was added to the `ConnectUtils` class, allowing the logic to be easily unit tested. All changes are limited to Kafka Connect, and will work with all client and Connect extensions (passing them to the clients if they are unknown). This supersedes #5867 and #5802. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect logs unnecessary warnings about unused configurations > --- > > Key: KAFKA-7509 > URL: https://issues.apache.org/jira/browse/KAFKA-7509 > Project: Kafka > Issue Type: Improvement > Components: clients, KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > > When running Connect, the logs contain quite a few warnings about "The > configuration '{}' was supplied but isn't a known config." This occurs when > Connect creates producers, consumers, and admin clients, because the > AbstractConfig is logging unused configuration properties upon construction. > It's complicated by the fact that the Producer, Consumer, and AdminClient all > create their own AbstractConfig instances within the constructor, so we can't > even call its {{ignore(String key)}} method. > See also KAFKA-6793 for a similar issue with Streams. > There are no arguments in the Producer, Consumer, or AdminClient constructors > to control whether the configs log these warnings, so a simpler workaround > is to only pass those configuration properties to the Producer, Consumer, and > AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig > configdefs know about. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations
[ https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673828#comment-16673828 ] ASF GitHub Bot commented on KAFKA-7509: --- rhauch closed pull request #5867: KAFKA-7509: Logging unused configs as DEBUG rather than WARN URL: https://github.com/apache/kafka/pull/5867 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 9cf13dd35df..82dbb8a7153 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -284,7 +284,7 @@ private void logAll() { */ public void logUnused() { for (String key : unused()) -log.warn("The configuration '{}' was supplied but isn't a known config.", key); +log.debug("The configuration '{}' was supplied but isn't a known config.", key); } /** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect logs unnecessary warnings about unused configurations > --- > > Key: KAFKA-7509 > URL: https://issues.apache.org/jira/browse/KAFKA-7509 > Project: Kafka > Issue Type: Improvement > Components: clients, KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > > When running Connect, the logs contain quite a few warnings about "The > configuration '{}' was supplied but isn't a known config." This occurs when > Connect creates producers, consumers, and admin clients, because the > AbstractConfig is logging unused configuration properties upon construction. > It's complicated by the fact that the Producer, Consumer, and AdminClient all > create their own AbstractConfig instances within the constructor, so we can't > even call its {{ignore(String key)}} method. > See also KAFKA-6793 for a similar issue with Streams. > There are no arguments in the Producer, Consumer, or AdminClient constructors > to control whether the configs log these warnings, so a simpler workaround > is to only pass those configuration properties to the Producer, Consumer, and > AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig > configdefs know about. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations
[ https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673834#comment-16673834 ] Randall Hauch commented on KAFKA-7509: -- Since changing the unused property log messages to debug was not the correct change, I closed that PR and took yet another approach that just filters out worker properties before passing them to the producer, consumer, and admin clients. Previously, Connect would pass many of its worker configuration properties into the producer, consumer, and admin client used for internal topics, resulting in lots of log warnings about unused config properties. With [this change|https://github.com/apache/kafka/pull/5876], Connect now attempts to filter out the worker’s configuration properties that are not applicable to the producer, consumer, or admin client used for internal topics. (Connect is already including only producer and consumer properties when creating those clients for connectors, since those properties are prefixed in the worker config.) For the most part, this is relatively straightforward, since there are some top-level worker-specific properties that can be removed, and most extension-specific properties have Connect-specific properties. Unfortunately, the REST extension is the only type of connect extension that uses unprefixed properties from the worker config, so any it is not possible to remove those from the properties passed to the producer, consumer, and admin clients. Hopefully, REST extensions are prevalant yet, and this will mean most users may not see any warnings about unused properties in the producer, consumer, and admin client. Removing the Connect worker configs is one step. The other is for the producer to remove the any properties that are specific to the consumer and admin client. Likewise, for the consumer we have to remove any properties that are specific to the producer and admin client, and for the admin client remove any properties that are specific to the producer and consumer. Note that any property that is unknown (e.g., properties for REST extension, interceptors, metric reporters, serdes, partitioners, etc.) must be passed to the producer, consumer, and admin client. All of these — except for the REST extension properties — should indeed be used by the producer and consumer. But, since the admin client only supports metric reporters, any properties for interceptors, serdes, partitioners and REST extension will also be logged as unused. This is about the best we can do at this point. All of this filtering logic was added to the ConnectUtils class, allowing the logic to be easily unit tested. All changes are limited to Kafka Connect, and will work with all client and Connect extensions (passing them to the clients if they are unknown). This still does not go as far as I'd like. It'd be ideal if Connect could use a RecordingMap to know which properties were used across _all_ of the clients and the worker itself, and to then log _only those_ properties that were never used at all. This is beyond the scope of this issue, however, and can be addressed in a follow-on issue that will require changes to AbstractConfig. > Kafka Connect logs unnecessary warnings about unused configurations > --- > > Key: KAFKA-7509 > URL: https://issues.apache.org/jira/browse/KAFKA-7509 > Project: Kafka > Issue Type: Improvement > Components: clients, KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > > When running Connect, the logs contain quite a few warnings about "The > configuration '{}' was supplied but isn't a known config." This occurs when > Connect creates producers, consumers, and admin clients, because the > AbstractConfig is logging unused configuration properties upon construction. > It's complicated by the fact that the Producer, Consumer, and AdminClient all > create their own AbstractConfig instances within the constructor, so we can't > even call its {{ignore(String key)}} method. > See also KAFKA-6793 for a similar issue with Streams. > There are no arguments in the Producer, Consumer, or AdminClient constructors > to control whether the configs log these warnings, so a simpler workaround > is to only pass those configuration properties to the Producer, Consumer, and > AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig > configdefs know about. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6890) Add connector level configurability for producer/consumer client configs
[ https://issues.apache.org/jira/browse/KAFKA-6890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673918#comment-16673918 ] Peter M Elias commented on KAFKA-6890: -- I found this issue after reading through the source to determine if this was possible and imagined a very similar patch as is proposed here. We have a variety of use cases for this capability. They include throughput tuning, variations in max message sizes, and overriding partitioner implementations. I took a look at the current implementation and the first thing that jumped out was that in order to support this, the broker connection model changes from a single I/O thread per worker to needing a dedicated producer/consumer instance for each overridden connector. While I don't think this is necessarily a bad tradeoff for the config flexibility, it does mean more I/O threads and more network connections and while a small deployment with few connectors may not notice a difference, a larger deployment choosing to run many overridden connectors may need to be warned of the possible increase in overhead. In regards to the possible duplication [~cricket007] has mentioned for KAFKA-4159, I found [this line|https://github.com/apache/kafka/pull/2548/files#diff-316d2c222b623ee65e8065863bf4b9ceR368] on the proposed implementation to be less elegant than [~Natengall]'s approach as it causes state to accumulate on the `producerProps` map which could cause nondeterministic application of prior configuration to connectors not intended to receive said configuration. Isolating the specific overrides and dedicating the particular producer or consumer instance to the connector strikes me as the safest way to do it as proposed here. I am also in favor of blacklisting critical configuration values such as `bootstrap.servers` from being overridden. If that strikes people as too heavy-handed than perhaps a loud warning; after all, the worst case outcome is that the particular connector will fail. Any other connectors will simply continue to use the worker-level configured values assuming we continue with the approach of isolating the overrides to the source/sink task. > Add connector level configurability for producer/consumer client configs > > > Key: KAFKA-6890 > URL: https://issues.apache.org/jira/browse/KAFKA-6890 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Allen Tang >Priority: Minor > > Right now, each source connector and sink connector inherit their client > configurations from the worker properties. Within the worker properties, all > configurations that have a prefix of "producer." or "consumer." are applied > to all source connectors and sink connectors respectively. > We should also provide connector-level overrides whereby connector properties > that are prefixed with "producer." and "consumer." are used to feed into the > producer and consumer clients embedded within source and sink connectors > respectively. The prefixes will be removed via a String#substring() call, and > the remainder of the connector property key will be used as the client > configuration key. The value is fed directly to the client as the > configuration value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)