[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714439#comment-16714439 ] Jonathan Santilli commented on KAFKA-7678: -- Hello [~guozhang] I can confirm the issue about calling the *.close()* method out of a null have been solved. However, I do not know the reason why that was happening (I did not dig enough into the code to understand the reason). Maybe we can create another Jira to explore it, what do you think? > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1, 2.0.1, 2.1.0 >Reporter: Jonathan Santilli >Assignee: Jonathan Santilli >Priority: Minor > Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2 > > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user
[ https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714486#comment-16714486 ] ASF GitHub Bot commented on KAFKA-7580: --- sarveshtamba opened a new pull request #6020: Fix for Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user(#KAFKA-7580) URL: https://github.com/apache/kafka/pull/6020 Fix for Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user(#KAFKA-7580) Refer https://issues.apache.org/jira/browse/KAFKA-7580 gradle unitTest passes successfully with message "BUILD SUCCESSFUL" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when > run as root user > -- > > Key: KAFKA-7580 > URL: https://issues.apache.org/jira/browse/KAFKA-7580 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Minor > > Created a non-root user and ran the following command to execute the failiing > unit test:- > ./gradlew streams:unitTest --tests > org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir > For a root user, the test case fails:- > = > > Task :streams:testClasses UP-TO-DATE > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED > java.lang.AssertionError: Expected exception: > org.apache.kafka.streams.errors.ProcessorStateException > 1 test completed, 1 failed > > Task :streams:unitTest FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':streams:unitTest'. > > There were failing tests. See the report at: > > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html > * Try: > Run with --stacktrace option to get the stack trace. Run with --info or > --debug option to get more log output. Run with --scan to get full insights. > * Get more help at https://help.gradle.org > BUILD FAILED in 20s > 26 actionable tasks: 2 executed, 24 up-to-date > = > However, for a non-root user the test cass passes as success:- > = > > Task :streams:testClasses > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED > BUILD SUCCESSFUL in 45s > 26 actionable tasks: 4 executed, 22 up-to-date > = > The failing unit test - > "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary > file directory and sets it as readOnly. The unit test is intended to throw an > exception - "ProcessorStateException", when the readOnly temporary file > directory is opened/accessed. > By default, non-root users opening/accessing readOnly file directory is not > allowed and it rightly throws up an error/exception in the unit test(which is > the intention of the unit test and it passes for non-root users). > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent > mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied > > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/ > ls: cannot access '/tmp/readOnlyDir/..': Permission denied > ls: cannot access '/tmp/readOnlyDir/.': Permission denied > ls: cannot access '/tmp/readOnlyDir/kid': Permission denied > ls: cannot access '/tmp/readOnlyDir/child': Permission denied > total 0 > d? ? ? ? ? ? ./ > d? ? ? ? ? ? ../ > d? ? ? ? ? ? child/ > d? ? ? ? ? ? kid/ > However, by default, root user can access any file in the system.:- > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 112 > dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > > root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent > > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 116 > dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 chil
[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user
[ https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714492#comment-16714492 ] Sarvesh Tamba commented on KAFKA-7580: -- Created a PR for "Fix for Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user" at [https://github.com/apache/kafka/pull/6020] The issues of "pure virtual method" should be tracked separately. > Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when > run as root user > -- > > Key: KAFKA-7580 > URL: https://issues.apache.org/jira/browse/KAFKA-7580 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Minor > > Created a non-root user and ran the following command to execute the failiing > unit test:- > ./gradlew streams:unitTest --tests > org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir > For a root user, the test case fails:- > = > > Task :streams:testClasses UP-TO-DATE > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED > java.lang.AssertionError: Expected exception: > org.apache.kafka.streams.errors.ProcessorStateException > 1 test completed, 1 failed > > Task :streams:unitTest FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':streams:unitTest'. > > There were failing tests. See the report at: > > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html > * Try: > Run with --stacktrace option to get the stack trace. Run with --info or > --debug option to get more log output. Run with --scan to get full insights. > * Get more help at https://help.gradle.org > BUILD FAILED in 20s > 26 actionable tasks: 2 executed, 24 up-to-date > = > However, for a non-root user the test cass passes as success:- > = > > Task :streams:testClasses > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED > BUILD SUCCESSFUL in 45s > 26 actionable tasks: 4 executed, 22 up-to-date > = > The failing unit test - > "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary > file directory and sets it as readOnly. The unit test is intended to throw an > exception - "ProcessorStateException", when the readOnly temporary file > directory is opened/accessed. > By default, non-root users opening/accessing readOnly file directory is not > allowed and it rightly throws up an error/exception in the unit test(which is > the intention of the unit test and it passes for non-root users). > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent > mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied > > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/ > ls: cannot access '/tmp/readOnlyDir/..': Permission denied > ls: cannot access '/tmp/readOnlyDir/.': Permission denied > ls: cannot access '/tmp/readOnlyDir/kid': Permission denied > ls: cannot access '/tmp/readOnlyDir/child': Permission denied > total 0 > d? ? ? ? ? ? ./ > d? ? ? ? ? ? ../ > d? ? ? ? ? ? child/ > d? ? ? ? ? ? kid/ > However, by default, root user can access any file in the system.:- > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 112 > dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > > root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent > > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 116 > dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/ > Hence the unit test does not throw an exception - "ProcessorStateException" > when the readOnly temporary file directory is opened, and the unit test > rightly fails for a root user. > Two approaches for resolving this failing unit test case:- > 1.) Run the unit tests as non-root users(simplest). > 2.) If running the unit test as root user, make the temporary file directory > as immutable in the unit test code and then test for exception
[jira] [Commented] (KAFKA-7581) Issues in building kafka using gradle on a Ubuntu based docker container
[ https://issues.apache.org/jira/browse/KAFKA-7581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714498#comment-16714498 ] Sarvesh Tamba commented on KAFKA-7581: -- Is there any progress on this? > Issues in building kafka using gradle on a Ubuntu based docker container > > > Key: KAFKA-7581 > URL: https://issues.apache.org/jira/browse/KAFKA-7581 > Project: Kafka > Issue Type: Bug > Components: build >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Blocker > > The following issues are seen when kafka is built using gradle on a Ubuntu > based docker container:- > /kafka-gradle/kafka-2.0.0/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:177: > File name too long > This can happen on some encrypted or legacy file systems. Please see SI-3623 > for more details. > .foreach { txnMetadataCacheEntry => > ^ > 56 warnings found > one error found > > Task :core:compileScala FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':core:compileScala'. > > Compilation failed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7716) Unprocessed messages when Broker fails
Finbarr Naughton created KAFKA-7716: --- Summary: Unprocessed messages when Broker fails Key: KAFKA-7716 URL: https://issues.apache.org/jira/browse/KAFKA-7716 Project: Kafka Issue Type: Bug Components: core, streams Affects Versions: 2.0.1, 1.0.0 Reporter: Finbarr Naughton This occurs when running on Kubernetes on bare metal. A Streams application with a single topology listening to two input topics A and B. A is read as a GlobalKTable, B as a KStream. The topology joins the stream to the GKTable and writes an updated message to topic A. The application is configured to use exactly_once processing. There are three worker nodes. Kafka brokers are deployed as a statefulset on the three nodes using the helm chart from here -[https://github.com/helm/charts/tree/master/incubator/kafka] The application has three instances spread across the three nodes. During a test, topic A is pre-populated with 50k messages over 5 minutes. Then 50k messages with the same key-set are sent to topic B over 5 minutes. The expected behaviour is that Topic A will contain 50k updated messages afterwards. While all brokers are available this is the case, even when one of the application pods is deleted. When a broker fails, however, a few expected updated messages fail to appear on topic A despite their existence on topic B. More complete description here - [https://stackoverflow.com/questions/53557247/some-events-unprocessed-by-kafka-streams-application-on-kubernetes-on-bare-metal] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user
[ https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714522#comment-16714522 ] Sarvesh Tamba commented on KAFKA-7580: -- [~mjsax] any plans to check up the issues of "pure virtual method"? Not been able to figure out why is this an issue and an intermittent/random one. Can you please check? > Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when > run as root user > -- > > Key: KAFKA-7580 > URL: https://issues.apache.org/jira/browse/KAFKA-7580 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Minor > > Created a non-root user and ran the following command to execute the failiing > unit test:- > ./gradlew streams:unitTest --tests > org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir > For a root user, the test case fails:- > = > > Task :streams:testClasses UP-TO-DATE > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED > java.lang.AssertionError: Expected exception: > org.apache.kafka.streams.errors.ProcessorStateException > 1 test completed, 1 failed > > Task :streams:unitTest FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':streams:unitTest'. > > There were failing tests. See the report at: > > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html > * Try: > Run with --stacktrace option to get the stack trace. Run with --info or > --debug option to get more log output. Run with --scan to get full insights. > * Get more help at https://help.gradle.org > BUILD FAILED in 20s > 26 actionable tasks: 2 executed, 24 up-to-date > = > However, for a non-root user the test cass passes as success:- > = > > Task :streams:testClasses > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED > BUILD SUCCESSFUL in 45s > 26 actionable tasks: 4 executed, 22 up-to-date > = > The failing unit test - > "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary > file directory and sets it as readOnly. The unit test is intended to throw an > exception - "ProcessorStateException", when the readOnly temporary file > directory is opened/accessed. > By default, non-root users opening/accessing readOnly file directory is not > allowed and it rightly throws up an error/exception in the unit test(which is > the intention of the unit test and it passes for non-root users). > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent > mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied > > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/ > ls: cannot access '/tmp/readOnlyDir/..': Permission denied > ls: cannot access '/tmp/readOnlyDir/.': Permission denied > ls: cannot access '/tmp/readOnlyDir/kid': Permission denied > ls: cannot access '/tmp/readOnlyDir/child': Permission denied > total 0 > d? ? ? ? ? ? ./ > d? ? ? ? ? ? ../ > d? ? ? ? ? ? child/ > d? ? ? ? ? ? kid/ > However, by default, root user can access any file in the system.:- > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 112 > dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > > root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent > > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 116 > dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/ > Hence the unit test does not throw an exception - "ProcessorStateException" > when the readOnly temporary file directory is opened, and the unit test > rightly fails for a root user. > Two approaches for resolving this failing unit test case:- > 1.) Run the unit tests as non-root users(simplest). > 2.) If running the unit test as root user, make the temporary file directory > as immutable in the unit test code and then test for exception(needs code > changes in the unit tests):- > root@p006vm18:/tmp# ch
[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user
[ https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714530#comment-16714530 ] Sarvesh Tamba commented on KAFKA-7580: -- To summarize, the "pure virtual method" issue is seen on both Ubuntu 16.04 and SLES 12 SP2, both as root & non-root users on both these environments. > Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when > run as root user > -- > > Key: KAFKA-7580 > URL: https://issues.apache.org/jira/browse/KAFKA-7580 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Minor > > Created a non-root user and ran the following command to execute the failiing > unit test:- > ./gradlew streams:unitTest --tests > org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir > For a root user, the test case fails:- > = > > Task :streams:testClasses UP-TO-DATE > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED > java.lang.AssertionError: Expected exception: > org.apache.kafka.streams.errors.ProcessorStateException > 1 test completed, 1 failed > > Task :streams:unitTest FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':streams:unitTest'. > > There were failing tests. See the report at: > > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html > * Try: > Run with --stacktrace option to get the stack trace. Run with --info or > --debug option to get more log output. Run with --scan to get full insights. > * Get more help at https://help.gradle.org > BUILD FAILED in 20s > 26 actionable tasks: 2 executed, 24 up-to-date > = > However, for a non-root user the test cass passes as success:- > = > > Task :streams:testClasses > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED > BUILD SUCCESSFUL in 45s > 26 actionable tasks: 4 executed, 22 up-to-date > = > The failing unit test - > "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary > file directory and sets it as readOnly. The unit test is intended to throw an > exception - "ProcessorStateException", when the readOnly temporary file > directory is opened/accessed. > By default, non-root users opening/accessing readOnly file directory is not > allowed and it rightly throws up an error/exception in the unit test(which is > the intention of the unit test and it passes for non-root users). > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent > mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied > > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/ > ls: cannot access '/tmp/readOnlyDir/..': Permission denied > ls: cannot access '/tmp/readOnlyDir/.': Permission denied > ls: cannot access '/tmp/readOnlyDir/kid': Permission denied > ls: cannot access '/tmp/readOnlyDir/child': Permission denied > total 0 > d? ? ? ? ? ? ./ > d? ? ? ? ? ? ../ > d? ? ? ? ? ? child/ > d? ? ? ? ? ? kid/ > However, by default, root user can access any file in the system.:- > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 112 > dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > > root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent > > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 116 > dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/ > Hence the unit test does not throw an exception - "ProcessorStateException" > when the readOnly temporary file directory is opened, and the unit test > rightly fails for a root user. > Two approaches for resolving this failing unit test case:- > 1.) Run the unit tests as non-root users(simplest). > 2.) If running the unit test as root user, make the temporary file directory > as immutable in the unit test code and then test for exception(needs code > changes in the unit tests):- > root@p006vm18:/tmp# chattr +i /tmp/readOnlyD
[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user
[ https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714524#comment-16714524 ] Sarvesh Tamba commented on KAFKA-7580: -- FYI, following are the environment details:- p006vm10:~ # lscpu Architecture: ppc64le Byte Order: Little Endian CPU(s): 16 On-line CPU(s) list: 0-15 Thread(s) per core: 1 Core(s) per socket: 1 Socket(s): 16 NUMA node(s): 1 Model: 2.1 (pvr 004b 0201) Model name: POWER8E (raw), altivec supported Hypervisor vendor: KVM Virtualization type: para L1d cache: 64K L1i cache: 32K NUMA node0 CPU(s): 0-15 p006vm10:~ # arch ppc64le p006vm10:~ # cat /etc/os-release NAME="SLES" VERSION="12-SP2" VERSION_ID="12.2" PRETTY_NAME="SUSE Linux Enterprise Server 12 SP2" ID="sles" ANSI_COLOR="0;32" CPE_NAME="cpe:/o:suse:sles:12:sp2" p006vm10:~ # > Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when > run as root user > -- > > Key: KAFKA-7580 > URL: https://issues.apache.org/jira/browse/KAFKA-7580 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Minor > > Created a non-root user and ran the following command to execute the failiing > unit test:- > ./gradlew streams:unitTest --tests > org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir > For a root user, the test case fails:- > = > > Task :streams:testClasses UP-TO-DATE > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED > java.lang.AssertionError: Expected exception: > org.apache.kafka.streams.errors.ProcessorStateException > 1 test completed, 1 failed > > Task :streams:unitTest FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':streams:unitTest'. > > There were failing tests. See the report at: > > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html > * Try: > Run with --stacktrace option to get the stack trace. Run with --info or > --debug option to get more log output. Run with --scan to get full insights. > * Get more help at https://help.gradle.org > BUILD FAILED in 20s > 26 actionable tasks: 2 executed, 24 up-to-date > = > However, for a non-root user the test cass passes as success:- > = > > Task :streams:testClasses > > Task :streams:unitTest > org.apache.kafka.streams.state.internals.RocksDBStoreTest > > shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED > BUILD SUCCESSFUL in 45s > 26 actionable tasks: 4 executed, 22 up-to-date > = > The failing unit test - > "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary > file directory and sets it as readOnly. The unit test is intended to throw an > exception - "ProcessorStateException", when the readOnly temporary file > directory is opened/accessed. > By default, non-root users opening/accessing readOnly file directory is not > allowed and it rightly throws up an error/exception in the unit test(which is > the intention of the unit test and it passes for non-root users). > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent > mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied > > sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/ > ls: cannot access '/tmp/readOnlyDir/..': Permission denied > ls: cannot access '/tmp/readOnlyDir/.': Permission denied > ls: cannot access '/tmp/readOnlyDir/kid': Permission denied > ls: cannot access '/tmp/readOnlyDir/child': Permission denied > total 0 > d? ? ? ? ? ? ./ > d? ? ? ? ? ? ../ > d? ? ? ? ? ? child/ > d? ? ? ? ? ? kid/ > However, by default, root user can access any file in the system.:- > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 112 > dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > > root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent > > root@p006vm18:/tmp# ll /tmp/readOnlyDir/ > total 116 > dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./ > drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../ > drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/ > drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/ > drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/ > Hence the unit test does not throw an exception - "P
[jira] [Commented] (KAFKA-7581) Issues in building kafka using gradle on a Ubuntu based docker container
[ https://issues.apache.org/jira/browse/KAFKA-7581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714543#comment-16714543 ] 点儿郎当 commented on KAFKA-7581: - No progress, directly rebuilt version of Kafka 2.0. But there are often [java.io.IOException: Too many open files]. What is the problem? Linux unlimit has been adjusted, with 12 disks, each 8 T, more than 60,000/qps. Would it be impossible to carry the machine? Three 64G memory, 32 boxes of CPU. Checked GC, no problem. > Issues in building kafka using gradle on a Ubuntu based docker container > > > Key: KAFKA-7581 > URL: https://issues.apache.org/jira/browse/KAFKA-7581 > Project: Kafka > Issue Type: Bug > Components: build >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2 > Environment: Ubuntu 16.04.3 LTS >Reporter: Sarvesh Tamba >Priority: Blocker > > The following issues are seen when kafka is built using gradle on a Ubuntu > based docker container:- > /kafka-gradle/kafka-2.0.0/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:177: > File name too long > This can happen on some encrypted or legacy file systems. Please see SI-3623 > for more details. > .foreach { txnMetadataCacheEntry => > ^ > 56 warnings found > one error found > > Task :core:compileScala FAILED > FAILURE: Build failed with an exception. > * What went wrong: > Execution failed for task ':core:compileScala'. > > Compilation failed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
[ https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715004#comment-16715004 ] Viktor Somogyi commented on KAFKA-7703: --- [~zsxwing], I just wanted to send a quick update that I have looked at the code and reproduced it based on your test and now I'm trying to figure out what's the best solution for this. I'll write an update once again when I have some solution proposal. > KafkaConsumer.position may return a wrong offset after "seekToEnd" is called > > > Key: KAFKA-7703 > URL: https://issues.apache.org/jira/browse/KAFKA-7703 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0 >Reporter: Shixiong Zhu >Assignee: Viktor Somogyi >Priority: Major > > After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong > offset set by another reset request. > Here is a reproducer: > https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246 > In this reproducer, "poll(0)" will send an "earliest" request in background. > However, after "seekToEnd" is called, due to a race condition in > "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen > between the check > https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585 > and the seek > https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605), > "KafkaConsumer.position" may return an "earliest" offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715220#comment-16715220 ] Guozhang Wang commented on KAFKA-7678: -- There are one or two edge cases which can cause record collector to be closed multiple times, we have noticed them recently and are thinking about cleanup the classes along the calling hierarchy (i.e. from Task Manager -> Task -> RecordCollector) for it. One example is: 1) a task is *suspended*, with EOS turned on (like your case), the record collector is closed(). 2) then the instance got killed (SIGTERM) , which causes all threads to be closed, which will then cause all their owned tasks to be *closed*. The same record collector close() call will be triggered again. > Failed to close producer due to java.lang.NullPointerException > -- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1, 2.0.1, 2.1.0 >Reporter: Jonathan Santilli >Assignee: Jonathan Santilli >Priority: Minor > Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2 > > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client
[ https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-7549. Resolution: Fixed > Old ProduceRequest with zstd compression does not return error to client > > > Key: KAFKA-7549 > URL: https://issues.apache.org/jira/browse/KAFKA-7549 > Project: Kafka > Issue Type: Bug > Components: compression >Reporter: Magnus Edenhill >Assignee: Lee Dongjin >Priority: Major > Fix For: 2.2.0, 2.1.1 > > > Kafka broker v2.1.0rc0. > > KIP-110 states that: > "Zstd will only be allowed for the bumped produce API. That is, for older > version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE > regardless of the message format." > > However, sending a ProduceRequest V3 with zstd compression (which is a client > side bug) closes the connection with the following exception rather than > returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse: > > {noformat} > [2018-10-25 11:40:31,813] ERROR Exception while processing request from > 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor) > org.apache.kafka.common.errors.InvalidRequestException: Error getting request > for apiKey: PRODUCE, apiVersion: 3, connectionId: > 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), > principal: User:ANONYMOUS > Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce > requests with version 3 are note allowed to use ZStandard compression > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client
[ https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715218#comment-16715218 ] ASF GitHub Bot commented on KAFKA-7549: --- hachikuji closed pull request #5925: KAFKA-7549: Old ProduceRequest with zstd compression does not return error to client URL: https://github.com/apache/kafka/pull/5925 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index f87090eba6a..9f9de42c866 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.CommonFields; import org.apache.kafka.common.protocol.Errors; @@ -172,6 +173,21 @@ public Builder(short minVersion, @Override public ProduceRequest build(short version) { +return build(version, true); +} + +// Visible for testing only +public ProduceRequest buildUnsafe(short version) { +return build(version, false); +} + +private ProduceRequest build(short version, boolean validate) { +if (validate) { +// Validate the given records first +for (MemoryRecords records : partitionRecords.values()) { +ProduceRequest.validateRecords(version, records); +} +} return new ProduceRequest(version, acks, timeout, partitionRecords, transactionalId); } @@ -210,8 +226,9 @@ private ProduceRequest(short version, short acks, int timeout, Map createPartitionSizes(Map partitionRecords) { @@ -231,7 +248,7 @@ public ProduceRequest(Struct struct, short version) { Struct partitionResponse = (Struct) partitionResponseObj; int partition = partitionResponse.get(PARTITION_ID); MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME); -validateRecords(version, records); +setFlags(records); partitionRecords.put(new TopicPartition(topic, partition), records); } } @@ -241,32 +258,11 @@ public ProduceRequest(Struct struct, short version) { transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null); } -private void validateRecords(short version, MemoryRecords records) { -if (version >= 3) { -Iterator iterator = records.batches().iterator(); -if (!iterator.hasNext()) -throw new InvalidRecordException("Produce requests with version " + version + " must have at least " + -"one record batch"); - -MutableRecordBatch entry = iterator.next(); -if (entry.magic() != RecordBatch.MAGIC_VALUE_V2) -throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + -"contain record batches with magic version 2"); -if (version < 7 && entry.compressionType() == CompressionType.ZSTD) { -throw new InvalidRecordException("Produce requests with version " + version + " are note allowed to " + -"use ZStandard compression"); -} - -if (iterator.hasNext()) -throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " + -"contain exactly one record batch"); -idempotent = entry.hasProducerId(); -transactional = entry.isTransactional(); -} - -// Note that we do not do similar validation for older versions to ensure compatibility with -// clients which send the wrong magic version in the wrong version of the produce request. The broker -// did not do this validation before, so we maintain that behavior here. +private void setFlags(MemoryRecords records) { +Iterator iterator = records.batches().iterator(); +MutableRecordBatch entry = iterator.next(); +idempotent = entry.hasProducerId(); +transactional = entry.isTransactional(); } /** @@ -394,6 +390,32 @@ public void clearPartitionRecords() { partitionRecords = null; }
[jira] [Assigned] (KAFKA-7681) new metric for request thread utilization by request type
[ https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mayuresh Gharat reassigned KAFKA-7681: -- Assignee: Mayuresh Gharat > new metric for request thread utilization by request type > - > > Key: KAFKA-7681 > URL: https://issues.apache.org/jira/browse/KAFKA-7681 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Rao >Assignee: Mayuresh Gharat >Priority: Major > > When the request thread pool is saturated, it's often useful to know which > type request is using the thread pool the most. It would be useful to add a > metric that tracks the fraction of request thread pool usage by request type. > This would be equivalent to (request rate) * (request local time ms) / 1000, > but will be more direct. This would require a new KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type
[ https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715264#comment-16715264 ] Mayuresh Gharat commented on KAFKA-7681: [~junrao] will try to give a shot at this. > new metric for request thread utilization by request type > - > > Key: KAFKA-7681 > URL: https://issues.apache.org/jira/browse/KAFKA-7681 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Rao >Assignee: Mayuresh Gharat >Priority: Major > > When the request thread pool is saturated, it's often useful to know which > type request is using the thread pool the most. It would be useful to add a > metric that tracks the fraction of request thread pool usage by request type. > This would be equivalent to (request rate) * (request local time ms) / 1000, > but will be more direct. This would require a new KIP. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6144) Allow state stores to serve stale reads during rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715471#comment-16715471 ] Navinder Brar commented on KAFKA-6144: -- Sure [~NIzhikov] , I will start writing a KIP. > Allow state stores to serve stale reads during rebalance > > > Key: KAFKA-6144 > URL: https://issues.apache.org/jira/browse/KAFKA-6144 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Antony Stubbs >Priority: Major > Labels: needs-kip > > Currently when expanding the KS cluster, the new node's partitions will be > unavailable during the rebalance, which for large states can take a very long > time, or for small state stores even more than a few ms can be a deal breaker > for micro service use cases. > One workaround is to allow stale data to be read from the state stores when > use case allows. > Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - > potentially a two phase rebalance > This is the description from KAFKA-6031 (keeping this JIRA as the title is > more descriptive): > {quote} > Currently reads for a key are served by single replica, which has 2 drawbacks: > - if replica is down there is a down time in serving reads for keys it was > responsible for until a standby replica takes over > - in case of semantic partitioning some replicas might become hot and there > is no easy way to scale the read load > If standby replicas would have endpoints that are exposed in StreamsMetadata > it would enable serving reads from several replicas, which would mitigate the > above drawbacks. > Due to the lag between replicas reading from multiple replicas simultaneously > would have weaker (eventual) consistency comparing to reads from single > replica. This however should be acceptable tradeoff in many cases. > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7717) Enable security configs in kafka.tools.EndToEndLatency
Stanislav Kozlovski created KAFKA-7717: -- Summary: Enable security configs in kafka.tools.EndToEndLatency Key: KAFKA-7717 URL: https://issues.apache.org/jira/browse/KAFKA-7717 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski The [end to end latency tool|[http://example.com|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/EndToEndLatency.scala]] does not support security configurations for authenticating to a secured broker. It only accepts `bootstrap.servers`, rendering it useless against SASL-secured clusters -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6036) Enable logical materialization to physical materialization
[ https://issues.apache.org/jira/browse/KAFKA-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6036. -- Resolution: Fixed Fix Version/s: 2.2.0 > Enable logical materialization to physical materialization > -- > > Key: KAFKA-6036 > URL: https://issues.apache.org/jira/browse/KAFKA-6036 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.2.0 > > > Today whenever users specify a queryable store name for KTable, we would > always add a physical state store in the translated processor topology. > For some scenarios, we should consider not physically materialize the KTable > but only "logically" materialize it when you have some simple transformation > operations or even join operations that generated new KTables, and which > needs to be materialized with a state store, you can use the changelog topic > of the previous KTable and applies the transformation logic upon restoration > instead of creating a new changelog topic. For example: > {code} > table1 = builder.table("topic1"); > table2 = table1.filter(..).join(table3); // table2 needs to be materialized > for joining > {code} > We can actually set the {{getter}} function of table2's materialized store, > say {{state2}} to be reading from {{topic1}} and then apply the filter > operator, instead of creating a new {{state2-changelog}} topic in this case. > We can come up with a general internal impl optimizations to determine when > to logically materialize, and whether we should actually allow users of DSL > to "hint" whether to materialize or not (it then may need a KIP). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7718) Allow customized header inheritance for stateful operators in DSL
Guozhang Wang created KAFKA-7718: Summary: Allow customized header inheritance for stateful operators in DSL Key: KAFKA-7718 URL: https://issues.apache.org/jira/browse/KAFKA-7718 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang As a follow-up work of https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API, we want to provide allow users to customize how record headers are inherited while traversing the topology at the DSL layer (at the lower-level Processor API layer, users are already capable for customizing and inheriting the headers as they forward the records to next processor nodes). Today the headers are implicitly inherited throughout the topology without any modifications within the Streams library. For stateless operators (filter, map, etc) this default inheritance policy should be sufficient. For stateful operators where multiple input records may be generating a single record (i.e. it is an n:1 transformations rather than 1:1 mapping), since we only inherit from the triggering record, which would seem to be a "random" choice to the users and other records' headers are lost. I'd propose we extend DSL to allow users to customize the headers inheritance policy for stateful operators, namely Joins and Aggregations. It would contain two parts: 1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control object with an additional function that allows users to pass in a lambda function (let's say its called HeadersMerger, but name subject to discuss over KIP) that takes two Headers object and generated a single Headers object in the return value. 2) On the implementation layer, we need to actually store the headers at the materialized state store so that they can be retrieved along with the record for join / aggregation processor. This would be changing the state store value bytes organization and hence better be considered carefully. Then when join / aggregate processor is triggered, the Headers of both records will be retrieved (one from the triggering record, one read from the materialized state store) and then passed to the HeadersMerger. Some low-hanging optimizations can be considered though, e.g. if users do not have overridden this interface, then we can consider not reading the headers from the other side at all to save IO cost. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715689#comment-16715689 ] ASF GitHub Bot commented on KAFKA-7610: --- guozhangwang closed pull request #5962: KAFKA-7610; Proactively timeout new group members if rebalance is delayed URL: https://github.com/apache/kafka/pull/5962 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala index 5f16acb6a85..93775182570 100644 --- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala +++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala @@ -26,11 +26,11 @@ import kafka.server.DelayedOperation private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, group: GroupMetadata, member: MemberMetadata, - heartbeatDeadline: Long, - sessionTimeout: Long) - extends DelayedOperation(sessionTimeout, Some(group.lock)) { + deadline: Long, + timeoutMs: Long) + extends DelayedOperation(timeoutMs, Some(group.lock)) { - override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _) - override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) + override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, deadline, forceComplete _) + override def onExpiration() = coordinator.onExpireHeartbeat(group, member, deadline) override def onComplete() = coordinator.onCompleteHeartbeat() } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index db89f14592f..007c6eea75a 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -600,7 +600,7 @@ class GroupCoordinator(val brokerId: Int, case Empty | Dead => case PreparingRebalance => for (member <- group.allMemberMetadata) { -group.invokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR)) +group.maybeInvokeJoinCallback(member, joinError(member.memberId, Errors.NOT_COORDINATOR)) } joinPurgatory.checkAndComplete(GroupKey(group.groupId)) @@ -674,14 +674,18 @@ class GroupCoordinator(val brokerId: Int, * Complete existing DelayedHeartbeats for the given member and schedule the next one */ private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) { +completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs) + } + + private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = { // complete current heartbeat expectation member.latestHeartbeat = time.milliseconds() val memberKey = MemberKey(member.groupId, member.memberId) heartbeatPurgatory.checkAndComplete(memberKey) // reschedule the next heartbeat expiration deadline -val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs -val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs) +val deadline = member.latestHeartbeat + timeoutMs +val delayedHeartbeat = new DelayedHeartbeat(this, group, member, deadline, timeoutMs) heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey)) } @@ -702,11 +706,23 @@ class GroupCoordinator(val brokerId: Int, val memberId = clientId + "-" + group.generateMemberIdSuffix val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols) + +member.isNew = true + // update the newMemberAdded flag to indicate that the join group can be further delayed if (group.is(PreparingRebalance) && group.generationId == 0) group.newMemberAdded = true group.add(member, callback) + +// The session timeout does not affect new members since they do not have their memberId and +// cannot send heartbeats. Furthermore, we cannot detect disconnects because sockets are muted +// while the JoinGroup is in purgatory. If the client does disconnect (e.g. because of a request +// timeout during a l
[jira] [Resolved] (KAFKA-7610) Detect consumer failures in initial JoinGroup
[ https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-7610. -- Resolution: Fixed Assignee: Jason Gustafson (was: Boyang Chen) Fix Version/s: 2.1.1 2.2.0 > Detect consumer failures in initial JoinGroup > - > > Key: KAFKA-7610 > URL: https://issues.apache.org/jira/browse/KAFKA-7610 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.2.0, 2.1.1 > > > The session timeout and heartbeating logic in the consumer allow us to detect > failures after a consumer joins the group. However, we have no mechanism to > detect failures during a consumer's initial JoinGroup when its memberId is > empty. When a client fails (e.g. due to a disconnect), the newly created > MemberMetadata will be left in the group metadata cache. Typically when this > happens, the client simply retries the JoinGroup. Every retry results in a > new dangling member created and left in the group. These members are doomed > to a session timeout when the group finally finishes the rebalance, but > before that time, they are occupying memory. In extreme cases, when a > rebalance is delayed (possibly due to a buggy application), this cycle can > repeat and the cache can grow quite large. > There are a couple options that come to mind to fix the problem: > 1. During the initial JoinGroup, we can detect failed members when the TCP > connection fails. This is difficult at the moment because we do not have a > mechanism to propagate disconnects from the network layer. A potential option > is to treat the disconnect as just another type of request and pass it to the > handlers through the request queue. > 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of > time, we can return earlier with the generated memberId and an error code > (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the > rebalance. The consumer can then poll for the rebalance using its assigned > memberId. And we can detect failures through the session timeout. Obviously > this option requires a KIP (and some more thought). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence
[ https://issues.apache.org/jira/browse/KAFKA-7680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716210#comment-16716210 ] Nikolay Izhikov commented on KAFKA-7680: Hello, [~junrao], [~hachikuji] I want to pick up this ticket. Would you mind it? > fetching a refilled chunk of log can cause log divergence > - > > Key: KAFKA-7680 > URL: https://issues.apache.org/jira/browse/KAFKA-7680 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Priority: Major > > We use FileRecords.writeTo to send a fetch response for a follower. A log > could be truncated and refilled in the middle of the send process (due to > leader change). Then it's possible for the follower to append some > uncommitted messages followed by committed messages. Those uncommitted > messages may never be removed, causing log divergence. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6393) Add tool to view active brokers
[ https://issues.apache.org/jira/browse/KAFKA-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716245#comment-16716245 ] Nikolay Izhikov commented on KAFKA-6393: Helo, [~hachikuji] Is this still actual? I want to pick up and resolve this JIRA. > Add tool to view active brokers > --- > > Key: KAFKA-6393 > URL: https://issues.apache.org/jira/browse/KAFKA-6393 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Jason Gustafson >Priority: Major > Labels: needs-kip > > It would be helpful to have a tool to view the active brokers in the cluster. > For example, it could include the following: > 1. Broker id and version (maybe detected through ApiVersions request) > 2. Broker listener information > 3. Whether broker is online > 4. Which broker is the active controller > 5. Maybe some key configs (e.g. inter-broker version and message format > version) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6393) Add tool to view active brokers
[ https://issues.apache.org/jira/browse/KAFKA-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716245#comment-16716245 ] Nikolay Izhikov edited comment on KAFKA-6393 at 12/11/18 5:42 AM: -- Hello, [~hachikuji] Is this still actual? I want to pick up and resolve this JIRA. was (Author: nizhikov): Helo, [~hachikuji] Is this still actual? I want to pick up and resolve this JIRA. > Add tool to view active brokers > --- > > Key: KAFKA-6393 > URL: https://issues.apache.org/jira/browse/KAFKA-6393 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Jason Gustafson >Priority: Major > Labels: needs-kip > > It would be helpful to have a tool to view the active brokers in the cluster. > For example, it could include the following: > 1. Broker id and version (maybe detected through ApiVersions request) > 2. Broker listener information > 3. Whether broker is online > 4. Which broker is the active controller > 5. Maybe some key configs (e.g. inter-broker version and message format > version) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6819) Refactor build-in StreamsMetrics internal implementations
[ https://issues.apache.org/jira/browse/KAFKA-6819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716255#comment-16716255 ] Nikolay Izhikov commented on KAFKA-6819: Hello, [~vvcephei] Is this ticket still actual? I want to pick up it. > Refactor build-in StreamsMetrics internal implementations > - > > Key: KAFKA-6819 > URL: https://issues.apache.org/jira/browse/KAFKA-6819 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > Our current internal implementations of StreamsMetrics and different layered > metrics like StreamMetricsThreadImpl, TaskMetrics, NodeMetrics etc are a bit > messy nowadays. We could improve on the current situation by doing the > following: > 0. For thread-level metrics, refactor the {{StreamsMetricsThreadImpl}} class > to {{ThreadMetrics}} such that a) it does not extend from > {{StreamsMetricsImpl}} but just include the {{StreamsMetricsThreadImpl}} as > its constructor parameters. And make its constructor, replacing with a static > {{addAllSensors(threadName)}} that tries to register all the thread-level > sensors for the given thread name. > 1. Add a static function for each of the built-in sensors of the thread-level > metrics in {{ThreadMetrics}} that relies on the internal > {{StreamsMetricsConventions}} to get thread level sensor names. If the sensor > cannot be found from the internal {{Metrics}} registry, create the sensor > on-the-fly. > 2.a Add a static {{removeAllSensors(threadName)}} function in > {{ThreadMetrics}} that tries to de-register all the thread-level metrics for > this thread, if there is no sensors then it will be a no-op. In > {{StreamThread#close()}} we will trigger this function; and similarly in > `TopologyTestDriver` when we close the driver we will also call this function > as well. As a result, the {{ThreadMetrics}} class itself would only contain > static functions with no member fields at all. > 2.b We can consider doing the same for {{TaskMetrics}}, {{NodeMetrics}} and > {{NamedCacheMetrics}} as well, and add a {{StoreMetrics}} following the > similar pattern: although these metrics are not accessed externally to their > enclosing class in the future this may be changed as well. > 3. Then, we only pass {{StreamsMetricsImpl}} around between the internal > classes, to access the specific sensor whenever trying to record it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6820) Improve on StreamsMetrics Public APIs
[ https://issues.apache.org/jira/browse/KAFKA-6820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716257#comment-16716257 ] Nikolay Izhikov commented on KAFKA-6820: Hello, [~vvcephei] Is this ticket still actual? I want to pick up it. > Improve on StreamsMetrics Public APIs > - > > Key: KAFKA-6820 > URL: https://issues.apache.org/jira/browse/KAFKA-6820 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip > > Our current `addLatencyAndThroughputSensor`, `addThroughputSensor` are not > very well designed and hence not very user friendly to people to add their > customized sensors. We could consider improving on this feature. Some related > things to consider: > 1. Our internal built-in metrics should be independent on these public APIs > which are for user customized sensor only. See KAFKA-6819 for related > description. > 2. We could enforce the scopeName possible values, and well document on the > sensor hierarchies that would be incurred from the function calls. In this > way the library can help closing user's sensors automatically when the > corresponding scope (store, task, thread, etc) is being de-constructed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence
[ https://issues.apache.org/jira/browse/KAFKA-7680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716273#comment-16716273 ] Jun Rao commented on KAFKA-7680: [~NIzhikov], thanks for your interest. Feel free to assign the Jira to yourself. > fetching a refilled chunk of log can cause log divergence > - > > Key: KAFKA-7680 > URL: https://issues.apache.org/jira/browse/KAFKA-7680 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Priority: Major > > We use FileRecords.writeTo to send a fetch response for a follower. A log > could be truncated and refilled in the middle of the send process (due to > leader change). Then it's possible for the follower to append some > uncommitted messages followed by committed messages. Those uncommitted > messages may never be removed, causing log divergence. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements
[ https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7523: --- Description: KIP-401: [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756] I have found that when writing "low level" {{Processors}} and {{Transformers}} that are stateful, often I want these processors to "own" one or more state stores, the details of which are not important to the business logic of the application. However, when incorporating these into the topologies defined by the high level API, using {{KStream::transform}} or {{KStream::process}}, I'm forced to specify the stores so the topology is wired up correctly. This creates an unfortunate pattern where the {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the pattern I've been following) holds the information about the name of the state stores, must be defined above the "high level" "fluent API"-style pipeline, which makes it hard to understand the business logic data flow. What I currently have to do: {code:java} TransformerSupplier transformerSupplier = new TransformerSupplierWithState(topology, val -> businessLogic(val)); builder.stream("in.topic") .transform(transformerSupplier, transformerSupplier.stateStoreNames()) .to("out.topic");{code} I have to both define the {{TransformerSupplier}} above the "fluent block", and pass the topology in so I can call {{topology.addStateStore()}} inside the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what the state store names are for that point in the topology. The lambda {{val -> businessLogic(val)}} is really what I want to see in-line because that's the crux of what is happening, along with the name of some factory method describing what the transformer is doing for me internally. This issue is obviously exacerbated when the "fluent block" is much longer than this example - It gets worse the farther away {{val -> businessLogic(val)}} is from {{KStream::transform}}. An improvement: {code:java} builder.stream("in.topic") .transform(transformerSupplierWithState(topology, val -> businessLogic(val))) .to("out.topic");{code} Which implies the existence of a {{KStream::transform}} that takes a single argument that adheres to this interface: {code:java} interface TransformerSupplierWithState { Transformer get(); String[] stateStoreNames(); }{code} Or better yet, I wouldn't have to pass in the topology, the caller of {{TransformerSupplierWithState}} could also handle the job of "adding" its state stores to the topology: {code:java} interface TransformerSupplierWithState { Transformer get(); Map stateStores(); }{code} Which would enable my ideal: {code:java} builder.stream("in.topic") .transform(transformerSupplierWithState(val -> businessLogic(val))) .to("out.topic");{code} I think this would be a huge improvement in the usability of low-level processors with the high-level DSL. Please let me know if I'm missing something as to why this cannot or should not happen, or if there is a better forum for this suggestion (presumably it would require a KIP?). I'd be happy to build it as well if there is a chance of it being merged, it doesn't seem like a huge challenge to me. was: I have found that when writing "low level" {{Processors}} and {{Transformers}} that are stateful, often I want these processors to "own" one or more state stores, the details of which are not important to the business logic of the application. However, when incorporating these into the topologies defined by the high level API, using {{KStream::transform}} or {{KStream::process}}, I'm forced to specify the stores so the topology is wired up correctly. This creates an unfortunate pattern where the {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the pattern I've been following) holds the information about the name of the state stores, must be defined above the "high level" "fluent API"-style pipeline, which makes it hard to understand the business logic data flow. What I currently have to do: {code:java} TransformerSupplier transformerSupplier = new TransformerSupplierWithState(topology, val -> businessLogic(val)); builder.stream("in.topic") .transform(transformerSupplier, transformerSupplier.stateStoreNames()) .to("out.topic");{code} I have to both define the {{TransformerSupplier}} above the "fluent block", and pass the topology in so I can call {{topology.addStateStore()}} inside the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what the state store names are for that point in the topology. The lambda {{val -> businessLogic(val)}} is really what I want to see in-line because that's the crux of what is happening, along with the name of some factory method describing what the transformer is