[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-01 Thread Tobias Johansson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671223#comment-16671223
 ] 

Tobias Johansson commented on KAFKA-7192:
-

[~mjsax] thanks! Is the issue only on client side (would updated client lib fix 
it)? And would disabling RocksDB mitigate the issue in the meantime? I haven't 
been able to re-produce the issue with RocksDB disabled.

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-11-01 Thread Sarvesh Tamba (JIRA)
Sarvesh Tamba created KAFKA-7580:


 Summary: Unit Test 
'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root 
user
 Key: KAFKA-7580
 URL: https://issues.apache.org/jira/browse/KAFKA-7580
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
 Environment: Ubuntu 16.04.3 LTS
Reporter: Sarvesh Tamba


Created a non-root user and ran the following command to execute the failiing 
unit test:-
./gradlew streams:unitTest --tests 
org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir

For a root user, the test case fails:-
=
> Task :streams:testClasses UP-TO-DATE

> Task :streams:unitTest

org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
 java.lang.AssertionError: Expected exception: 
org.apache.kafka.streams.errors.ProcessorStateException

1 test completed, 1 failed

> Task :streams:unitTest FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':streams:unitTest'.
> There were failing tests. See the report at: 
> file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug 
option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

BUILD FAILED in 20s
26 actionable tasks: 2 executed, 24 up-to-date
=

However, for a non-root user the test cass passes as success:-
=
> Task :streams:testClasses

> Task :streams:unitTest

org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED

BUILD SUCCESSFUL in 45s
26 actionable tasks: 4 executed, 22 up-to-date

=

The failing unit test - 
"shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
file directory and sets it as readOnly. The unit test is intended to throw an 
exception - "ProcessorStateException", when the readOnly temporary file 
directory is opened/accessed.

By default, non-root users opening/accessing readOnly file directory is not 
allowed and it rightly throws up an error/exception in the unit test(which is 
the intention of the unit test and it passes for non-root users).

sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
 mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
 
 sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
 ls: cannot access '/tmp/readOnlyDir/..': Permission denied
 ls: cannot access '/tmp/readOnlyDir/.': Permission denied
 ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
 ls: cannot access '/tmp/readOnlyDir/child': Permission denied
 total 0
 d? ? ? ? ? ? ./
 d? ? ? ? ? ? ../
 d? ? ? ? ? ? child/
 d? ? ? ? ? ? kid/

However, by default, root user can access any file in the system.:-
 root@p006vm18:/tmp# ll /tmp/readOnlyDir/
 total 112
 dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
 drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
 drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
 drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
 
 root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
 
 root@p006vm18:/tmp# ll /tmp/readOnlyDir/
 total 116
 dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
 drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
 drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
 drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
 drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/

Hence the unit test does not throw an exception - "ProcessorStateException" 
when the readOnly temporary file directory is opened, and the unit test rightly 
fails for a root user.

Two approaches for resolving this failing unit test case:-
1.) Run the unit tests as non-root users(simplest).
2.) If running the unit test as root user, make the temporary file directory as 
immutable in the unit test code and then test for exception(needs code changes 
in the unit tests):-

root@p006vm18:/tmp# chattr +i /tmp/readOnlyDir/
root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/grandparent
mkdir: cannot create directory â/tmp/readOnlyDir/grandparentâ: Permission denied

A file with an immutable attribute can not be:
- Modified
- Deleted
- Renamed
- No soft or hard link created by anyone including root user.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7581) Issues in building kafka using gradle on a Ubuntu based docker container

2018-11-01 Thread Sarvesh Tamba (JIRA)
Sarvesh Tamba created KAFKA-7581:


 Summary: Issues in building kafka using gradle on a Ubuntu based 
docker container
 Key: KAFKA-7581
 URL: https://issues.apache.org/jira/browse/KAFKA-7581
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
 Environment: Ubuntu 16.04.3 LTS
Reporter: Sarvesh Tamba


The following issues are seen when kafka is built using gradle on a Ubuntu 
based docker container:-

/kafka-gradle/kafka-2.0.0/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:177:
 File name too long
 This can happen on some encrypted or legacy file systems. Please see SI-3623 
for more details.
 .foreach { txnMetadataCacheEntry =>
 ^
 56 warnings found
 one error found

> Task :core:compileScala FAILED

FAILURE: Build failed with an exception.

* What went wrong:
 Execution failed for task ':core:compileScala'.
 > Compilation failed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7582) MetadataRequest don't support autoCreateTopic below version 4.

2018-11-01 Thread Yanjie Wang (JIRA)
Yanjie Wang created KAFKA-7582:
--

 Summary: MetadataRequest don't support autoCreateTopic below 
version 4.
 Key: KAFKA-7582
 URL: https://issues.apache.org/jira/browse/KAFKA-7582
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Yanjie Wang


This line of MetadataRequest.

''if (!allowAutoTopicCreation && version < 4)" should remove "!".

Location: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L110]

I hope that I am not mistaken. 

As a Java beginner



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7582) MetadataRequest don't support autoCreateTopic below version 4.

2018-11-01 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671422#comment-16671422
 ] 

Ismael Juma commented on KAFKA-7582:


That line looks correct to me.

> MetadataRequest don't support autoCreateTopic below version 4.
> --
>
> Key: KAFKA-7582
> URL: https://issues.apache.org/jira/browse/KAFKA-7582
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Yanjie Wang
>Priority: Minor
>
> This line of MetadataRequest.
> ''if (!allowAutoTopicCreation && version < 4)" should remove "!".
> Location: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L110]
> I hope that I am not mistaken. 
> As a Java beginner



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7582) MetadataRequest don't support autoCreateTopic below version 4.

2018-11-01 Thread Yanjie Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanjie Wang updated KAFKA-7582:
---
Description: 
This line of MetadataRequest.

''if (\!allowAutoTopicCreation && version < 4)" should remove "!".

Location: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L110]

I hope that I am not mistaken. 

As a Java beginner

  was:
This line of MetadataRequest.

''if (!allowAutoTopicCreation && version < 4)" should remove "!".

Location: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L110]

I hope that I am not mistaken. 

As a Java beginner


> MetadataRequest don't support autoCreateTopic below version 4.
> --
>
> Key: KAFKA-7582
> URL: https://issues.apache.org/jira/browse/KAFKA-7582
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Yanjie Wang
>Priority: Minor
>
> This line of MetadataRequest.
> ''if (\!allowAutoTopicCreation && version < 4)" should remove "!".
> Location: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L110]
> I hope that I am not mistaken. 
> As a Java beginner



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7582) MetadataRequest don't support autoCreateTopic below version 4.

2018-11-01 Thread Yanjie Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671468#comment-16671468
 ] 

Yanjie Wang commented on KAFKA-7582:


[~ijuma]

MetadataRequest versions older than 4 don't support the allowAutoTopicCreation 
field.
So we should raise when allowAutoTopicCreation and version < 4.


> MetadataRequest don't support autoCreateTopic below version 4.
> --
>
> Key: KAFKA-7582
> URL: https://issues.apache.org/jira/browse/KAFKA-7582
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Yanjie Wang
>Priority: Minor
>
> This line of MetadataRequest.
> ''if (\!allowAutoTopicCreation && version < 4)" should remove "!".
> Location: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L110]
> I hope that I am not mistaken. 
> As a Java beginner



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7420) Global stores should be guarded as read-only for regular tasks

2018-11-01 Thread Nikolay Izhikov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671503#comment-16671503
 ] 

Nikolay Izhikov commented on KAFKA-7420:


[~mjsax]

> Does this help?

Yes, thanks. I'll try to provide PR in the next few days.

> Global stores should be guarded as read-only for regular tasks
> --
>
> Key: KAFKA-7420
> URL: https://issues.apache.org/jira/browse/KAFKA-7420
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: newbie++
>
> Global stores should only be update by the global thread. Any other task, 
> should only read from a global store. However, when getting a reference to a 
> global store, all tasks have full read/write access to the store.
> We should put a guard in place and only return either _(a)_ a read-only 
> store, or _(b)_ wrap the store but throw an exception on write for regular 
> tasks.
> While the read-only store idea might be cleaner from an API point of view, we 
> should consider the second approach for 2 reasons: (1) it's backwards 
> compatible (of course, code might fail at runtime, but this seems to be ok, 
> as it indicates a bug in the user code anyway) (2) with regard to 
> [KIP-358|https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times],
>  we should have the more runtime efficient methods at this level (currently, 
> global stores are only key-value stores and this argument falls a little 
> short though—however, it might be a good idea to stay future proof; at least, 
> we should discuss it).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7579) System Test Failure - security_test.SecurityTest.test_client_ssl_endpoint_validation_failure

2018-11-01 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671607#comment-16671607
 ] 

Manikumar commented on KAFKA-7579:
--

This issue is related to KAFKA-7561

> System Test Failure - 
> security_test.SecurityTest.test_client_ssl_endpoint_validation_failure
> 
>
> Key: KAFKA-7579
> URL: https://issues.apache.org/jira/browse/KAFKA-7579
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.1
>Reporter: Stanislav Kozlovski
>Priority: Blocker
>
> The security_test.SecurityTest.test_client_ssl_endpoint_validation_failure 
> test with security_protocol=SSL fails to pass
> {code:java}
> SESSION REPORT (ALL TESTS) ducktape version: 0.7.1 session_id: 
> 2018-10-31--002 run time: 2 minutes 12.452 seconds tests run: 2 passed: 1 
> failed: 1 ignored: 0 test_id: 
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.interbroker_security_protocol=PLAINTEXT.security_protocol=SSL
>  status: FAIL run time: 1 minute 2.149 seconds Node ducker@ducker05: did not 
> stop within the specified timeout of 15 seconds Traceback (most recent call 
> last): File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 132, in run data = self.run_test() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 185, in run_test return self.test_context.function(self.test) File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line 324, in 
> wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) 
> File "/opt/kafka-dev/tests/kafkatest/tests/core/security_test.py", line 114, 
> in test_client_ssl_endpoint_validation_failure self.consumer.stop() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/services/background_thread.py",
>  line 80, in stop super(BackgroundThreadService, self).stop() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/services/service.py", line 
> 278, in stop self.stop_node(node) File 
> "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 254, in 
> stop_node (str(node.account), str(self.stop_timeout_sec)) AssertionError: 
> Node ducker@ducker05: did not stop within the specified timeout of 15 seconds 
> test_id: 
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.interbroker_security_protocol=SSL.security_protocol=PLAINTEXT
>  status: PASS run time: 1 minute 10.144 seconds ducker-ak test failed
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7420) Global stores should be guarded as read-only for regular tasks

2018-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671610#comment-16671610
 ] 

ASF GitHub Bot commented on KAFKA-7420:
---

nizhikov opened a new pull request #5865: KAFKA-7420: Global store surrounded 
by read only implementation
URL: https://github.com/apache/kafka/pull/5865
 
 
   Global store surrounded by read-only `KeyValueStore` implementation.
   All methods that try to write into Store will throw 
`UnsupportedOperationException`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Global stores should be guarded as read-only for regular tasks
> --
>
> Key: KAFKA-7420
> URL: https://issues.apache.org/jira/browse/KAFKA-7420
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: newbie++
>
> Global stores should only be update by the global thread. Any other task, 
> should only read from a global store. However, when getting a reference to a 
> global store, all tasks have full read/write access to the store.
> We should put a guard in place and only return either _(a)_ a read-only 
> store, or _(b)_ wrap the store but throw an exception on write for regular 
> tasks.
> While the read-only store idea might be cleaner from an API point of view, we 
> should consider the second approach for 2 reasons: (1) it's backwards 
> compatible (of course, code might fail at runtime, but this seems to be ok, 
> as it indicates a bug in the user code anyway) (2) with regard to 
> [KIP-358|https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times],
>  we should have the more runtime efficient methods at this level (currently, 
> global stores are only key-value stores and this argument falls a little 
> short though—however, it might be a good idea to stay future proof; at least, 
> we should discuss it).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-11-01 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671625#comment-16671625
 ] 

点儿郎当 commented on KAFKA-7481:
-

kafka version1.0 
[2018-11-01 21:37:26,250] ERROR [Controller id=13 epoch=8] Initiated state 
change for partition 
topic.cloud.service.bigdata.news_contents_submits_stats.content-6 from 
OnlinePartition to OnlinePartition failed (state.change.logger)
kafka.common.StateChangeFailedException: [Controller id=13 epoch=8] Encountered 
error while electing leader for partition 
topic.cloud.service.bigdata.news_contents_submits_stats.content-6 due to: 
Preferred replica 12 for partition 
topic.cloud.service.bigdata.news_contents_submits_stats.content-6 is either not 
alive or not in the isr. Current leader and ISR: 
[{"leader":13,"leader_epoch":74,"isr":[13]}]
at 
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:324)

How did it happen?How to solve?

coding
def selectLeader(topicAndPartition: TopicAndPartition,
   currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) 
= {
val assignedReplicas = 
controllerContext.partitionReplicaAssignment(topicAndPartition)
val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader
val currentLeader = 
controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
if (currentLeader == preferredReplica) {
  throw new LeaderElectionNotNeededException("Preferred replica %d is 
already the current leader for partition %s"
   .format(preferredReplica, 
topicAndPartition))
} else {
  info("Current leader %d for partition %s is not the preferred 
replica.".format(currentLeader, topicAndPartition) +
" Triggering preferred replica leader election")
  // check if preferred replica is not the current leader and is alive and 
in the isr
  if (controllerContext.isReplicaOnline(preferredReplica, 
topicAndPartition) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
val newLeaderAndIsr = currentLeaderAndIsr.newLeader(preferredReplica)
(newLeaderAndIsr, assignedReplicas)
  } else {
throw new StateChangeFailedException(s"Preferred replica 
$preferredReplica for partition $topicAndPartition " +
  s"is either not alive or not in the isr. Current leader and ISR: 
[$currentLeaderAndIsr]")
  }
}

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7420) Global stores should be guarded as read-only for regular tasks

2018-11-01 Thread Nikolay Izhikov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671639#comment-16671639
 ] 

Nikolay Izhikov commented on KAFKA-7420:


[~mjsax]

I've implemented initial PR. 

Can you take a look?

> Global stores should be guarded as read-only for regular tasks
> --
>
> Key: KAFKA-7420
> URL: https://issues.apache.org/jira/browse/KAFKA-7420
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: newbie++
>
> Global stores should only be update by the global thread. Any other task, 
> should only read from a global store. However, when getting a reference to a 
> global store, all tasks have full read/write access to the store.
> We should put a guard in place and only return either _(a)_ a read-only 
> store, or _(b)_ wrap the store but throw an exception on write for regular 
> tasks.
> While the read-only store idea might be cleaner from an API point of view, we 
> should consider the second approach for 2 reasons: (1) it's backwards 
> compatible (of course, code might fail at runtime, but this seems to be ok, 
> as it indicates a bug in the user code anyway) (2) with regard to 
> [KIP-358|https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times],
>  we should have the more runtime efficient methods at this level (currently, 
> global stores are only key-value stores and this argument falls a little 
> short though—however, it might be a good idea to stay future proof; at least, 
> we should discuss it).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7579) System Test Failure - security_test.SecurityTest.test_client_ssl_endpoint_validation_failure

2018-11-01 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671733#comment-16671733
 ] 

Manikumar edited comment on KAFKA-7579 at 11/1/18 3:15 PM:
---

tests are passing locally after increasing console consume timeout. looks like 
the issue started after [https://github.com/apache/kafka/pull/5735]
I am going to lower the priority to unblock 2.0.1 release. Let me know if any 
concerns.


was (Author: omkreddy):
tests are passing locally after increasing console consume timeout. looks like 
the issue started after [https://github.com/apache/kafka/pull/5735]
I am going lower the priority to unblock 2.0.1 release. Let me know if any 
concerns.

> System Test Failure - 
> security_test.SecurityTest.test_client_ssl_endpoint_validation_failure
> 
>
> Key: KAFKA-7579
> URL: https://issues.apache.org/jira/browse/KAFKA-7579
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.1
>Reporter: Stanislav Kozlovski
>Priority: Blocker
>
> The security_test.SecurityTest.test_client_ssl_endpoint_validation_failure 
> test with security_protocol=SSL fails to pass
> {code:java}
> SESSION REPORT (ALL TESTS) ducktape version: 0.7.1 session_id: 
> 2018-10-31--002 run time: 2 minutes 12.452 seconds tests run: 2 passed: 1 
> failed: 1 ignored: 0 test_id: 
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.interbroker_security_protocol=PLAINTEXT.security_protocol=SSL
>  status: FAIL run time: 1 minute 2.149 seconds Node ducker@ducker05: did not 
> stop within the specified timeout of 15 seconds Traceback (most recent call 
> last): File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 132, in run data = self.run_test() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 185, in run_test return self.test_context.function(self.test) File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line 324, in 
> wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) 
> File "/opt/kafka-dev/tests/kafkatest/tests/core/security_test.py", line 114, 
> in test_client_ssl_endpoint_validation_failure self.consumer.stop() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/services/background_thread.py",
>  line 80, in stop super(BackgroundThreadService, self).stop() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/services/service.py", line 
> 278, in stop self.stop_node(node) File 
> "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 254, in 
> stop_node (str(node.account), str(self.stop_timeout_sec)) AssertionError: 
> Node ducker@ducker05: did not stop within the specified timeout of 15 seconds 
> test_id: 
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.interbroker_security_protocol=SSL.security_protocol=PLAINTEXT
>  status: PASS run time: 1 minute 10.144 seconds ducker-ak test failed
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7579) System Test Failure - security_test.SecurityTest.test_client_ssl_endpoint_validation_failure

2018-11-01 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671733#comment-16671733
 ] 

Manikumar commented on KAFKA-7579:
--

tests are passing locally after increasing console consume timeout. looks like 
the issue started after [https://github.com/apache/kafka/pull/5735]
I am going lower the priority to unblock 2.0.1 release. Let me know if any 
concerns.

> System Test Failure - 
> security_test.SecurityTest.test_client_ssl_endpoint_validation_failure
> 
>
> Key: KAFKA-7579
> URL: https://issues.apache.org/jira/browse/KAFKA-7579
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.1
>Reporter: Stanislav Kozlovski
>Priority: Blocker
>
> The security_test.SecurityTest.test_client_ssl_endpoint_validation_failure 
> test with security_protocol=SSL fails to pass
> {code:java}
> SESSION REPORT (ALL TESTS) ducktape version: 0.7.1 session_id: 
> 2018-10-31--002 run time: 2 minutes 12.452 seconds tests run: 2 passed: 1 
> failed: 1 ignored: 0 test_id: 
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.interbroker_security_protocol=PLAINTEXT.security_protocol=SSL
>  status: FAIL run time: 1 minute 2.149 seconds Node ducker@ducker05: did not 
> stop within the specified timeout of 15 seconds Traceback (most recent call 
> last): File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 132, in run data = self.run_test() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 185, in run_test return self.test_context.function(self.test) File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line 324, in 
> wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) 
> File "/opt/kafka-dev/tests/kafkatest/tests/core/security_test.py", line 114, 
> in test_client_ssl_endpoint_validation_failure self.consumer.stop() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/services/background_thread.py",
>  line 80, in stop super(BackgroundThreadService, self).stop() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/services/service.py", line 
> 278, in stop self.stop_node(node) File 
> "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 254, in 
> stop_node (str(node.account), str(self.stop_timeout_sec)) AssertionError: 
> Node ducker@ducker05: did not stop within the specified timeout of 15 seconds 
> test_id: 
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.interbroker_security_protocol=SSL.security_protocol=PLAINTEXT
>  status: PASS run time: 1 minute 10.144 seconds ducker-ak test failed
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7582) MetadataRequest don't support autoCreateTopic below version 4.

2018-11-01 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671787#comment-16671787
 ] 

Ismael Juma commented on KAFKA-7582:


MetadataRequest versions older than 4 treat all requests as 
`allowAutoTopicCreation == true`, so we only fail if that value is set to false 
and version < 4. So, there's no bug here. Have you experienced any problem 
related to this? If not, I suggest closing.

> MetadataRequest don't support autoCreateTopic below version 4.
> --
>
> Key: KAFKA-7582
> URL: https://issues.apache.org/jira/browse/KAFKA-7582
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Yanjie Wang
>Priority: Minor
>
> This line of MetadataRequest.
> ''if (\!allowAutoTopicCreation && version < 4)" should remove "!".
> Location: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L110]
> I hope that I am not mistaken. 
> As a Java beginner



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7538) Improve locking model used to update ISRs and HW

2018-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671801#comment-16671801
 ] 

ASF GitHub Bot commented on KAFKA-7538:
---

rajinisivaram opened a new pull request #5866: KAFKA-7538: Reduce lock 
contention for Partition ISR lock
URL: https://github.com/apache/kafka/pull/5866
 
 
   Check for ISR updates using ISR read lock and acquire ISR write lock only if 
ISR needs to be updated. This avoids lock contention between request handler 
threads processing log appends on the leader holding the ISR read lock and 
request handler threads processing replica fetch requests that check/update ISR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve locking model used to update ISRs and HW
> 
>
> Key: KAFKA-7538
> URL: https://issues.apache.org/jira/browse/KAFKA-7538
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.2.0
>
>
> We currently use a ReadWriteLock in Partition to update ISRs and high water 
> mark for the partition. This can result in severe lock contention if there 
> are multiple producers writing a large amount of data into a single partition.
> The current locking model is:
>  # read lock while appending to log on every Produce request on the request 
> handler thread
>  # write lock on leader change, updating ISRs etc. on request handler or 
> scheduler thread
>  # write lock on every replica fetch request to check if ISRs need to be 
> updated and to update HW and ISR on the request handler thread
> 2) is infrequent, but 1) and 3) may be frequent and can result in lock 
> contention. If there are lots of produce requests to a partition from 
> multiple processes, on the leader broker we may see:
>  # one slow log append locks up one request thread for that produce while 
> holding onto the read lock
>  # (replicationFactor-1) request threads can be blocked waiting for write 
> lock to process replica fetch request
>  # potentially several other request threads processing Produce may be queued 
> up to acquire read lock because of the waiting writers.
> In a thread dump with this issue, we noticed several request threads blocked 
> waiting for write, possibly to due to replication fetch retries.
>  
> Possible fixes:
>  # Process `Partition#maybeExpandIsr` on a single scheduler thread similar to 
> `Partition#maybeShrinkIsr` so that only a single thread is blocked on the 
> write lock. But this will delay updating ISRs and HW.
>  # Change locking in `Partition#maybeExpandIsr` so that only read lock is 
> acquired to check if ISR needs updating and write lock is acquired only to 
> update ISRs. Also use a different lock for updating HW (perhaps just the 
> Partition object lock) so that typical replica fetch requests complete 
> without acquiring Partition write lock on the request handler thread.
> I will submit a PR for 2) , but other suggestions to fix this are welcome.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7582) MetadataRequest don't support autoCreateTopic below version 4.

2018-11-01 Thread Yanjie Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yanjie Wang resolved KAFKA-7582.

Resolution: Not A Bug

> MetadataRequest don't support autoCreateTopic below version 4.
> --
>
> Key: KAFKA-7582
> URL: https://issues.apache.org/jira/browse/KAFKA-7582
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Yanjie Wang
>Priority: Minor
>
> This line of MetadataRequest.
> ''if (\!allowAutoTopicCreation && version < 4)" should remove "!".
> Location: 
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L110]
> I hope that I am not mistaken. 
> As a Java beginner



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-01 Thread Daren Thomas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671844#comment-16671844
 ] 

Daren Thomas commented on KAFKA-7577:
-

I see similar behavior with the right side of the Left Join.  If the message 
value is null, the Left Join is not processed.

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7579) System Test Failure - security_test.SecurityTest.test_client_ssl_endpoint_validation_failure

2018-11-01 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-7579:
-
Fix Version/s: 2.0.2

> System Test Failure - 
> security_test.SecurityTest.test_client_ssl_endpoint_validation_failure
> 
>
> Key: KAFKA-7579
> URL: https://issues.apache.org/jira/browse/KAFKA-7579
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.1
>Reporter: Stanislav Kozlovski
>Priority: Blocker
> Fix For: 2.0.2
>
>
> The security_test.SecurityTest.test_client_ssl_endpoint_validation_failure 
> test with security_protocol=SSL fails to pass
> {code:java}
> SESSION REPORT (ALL TESTS) ducktape version: 0.7.1 session_id: 
> 2018-10-31--002 run time: 2 minutes 12.452 seconds tests run: 2 passed: 1 
> failed: 1 ignored: 0 test_id: 
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.interbroker_security_protocol=PLAINTEXT.security_protocol=SSL
>  status: FAIL run time: 1 minute 2.149 seconds Node ducker@ducker05: did not 
> stop within the specified timeout of 15 seconds Traceback (most recent call 
> last): File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 132, in run data = self.run_test() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", 
> line 185, in run_test return self.test_context.function(self.test) File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line 324, in 
> wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) 
> File "/opt/kafka-dev/tests/kafkatest/tests/core/security_test.py", line 114, 
> in test_client_ssl_endpoint_validation_failure self.consumer.stop() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/services/background_thread.py",
>  line 80, in stop super(BackgroundThreadService, self).stop() File 
> "/usr/local/lib/python2.7/dist-packages/ducktape/services/service.py", line 
> 278, in stop self.stop_node(node) File 
> "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 254, in 
> stop_node (str(node.account), str(self.stop_timeout_sec)) AssertionError: 
> Node ducker@ducker05: did not stop within the specified timeout of 15 seconds 
> test_id: 
> kafkatest.tests.core.security_test.SecurityTest.test_client_ssl_endpoint_validation_failure.interbroker_security_protocol=SSL.security_protocol=PLAINTEXT
>  status: PASS run time: 1 minute 10.144 seconds ducker-ak test failed
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6890) Add connector level configurability for producer/consumer client configs

2018-11-01 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671976#comment-16671976
 ] 

Randall Hauch commented on KAFKA-6890:
--

First of all, we're going to need a KIP for this, since it will change the 
configurations exposed to users. We need to make sure that this doesn't break 
backward compatibility, etc.

Second, I like this change and the ability to override some of the 
producer/consumer behavior. I also think that generally it's better to give 
users more control, but I'm actually very concerned about allowing connector 
configurations to override {{bootstrap servers}}. Yes, I can totally see the 
benefits, but this also gives users that don't know what they're doing to 
really mess things up. Some people will undoubtably think they *have* to define 
these properties in every connector, and then when they change their bootstrap 
URL in the worker they might forget it in their connector configs. Or, worse 
yet, having the internal topics in one cluster and the connector records in a 
different cluster will very much conflict with any hopes of adding EOS to 
source connectors.

> Add connector level configurability for producer/consumer client configs
> 
>
> Key: KAFKA-6890
> URL: https://issues.apache.org/jira/browse/KAFKA-6890
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Allen Tang
>Priority: Minor
>
> Right now, each source connector and sink connector inherit their client 
> configurations from the worker properties. Within the worker properties, all 
> configurations that have a prefix of "producer." or "consumer." are applied 
> to all source connectors and sink connectors respectively.
> We should also provide connector-level overrides whereby connector properties 
> that are prefixed with "producer." and "consumer." are used to feed into the 
> producer and consumer clients embedded within source and sink connectors 
> respectively. The prefixes will be removed via a String#substring() call, and 
> the remainder of the connector property key will be used as the client 
> configuration key. The value is fed directly to the client as the 
> configuration value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6890) Add connector level configurability for producer/consumer client configs

2018-11-01 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671976#comment-16671976
 ] 

Randall Hauch edited comment on KAFKA-6890 at 11/1/18 6:32 PM:
---

First of all, we're going to need a KIP for this, since it will change the 
configurations exposed to users. We need to make sure that this doesn't break 
backward compatibility, etc.

Second, I like this change and the ability to override some of the 
producer/consumer behavior. I also think that generally it's better to give 
users more control, but I'm actually very concerned about allowing connector 
configurations to override {{bootstrap servers}}. Yes, I can totally see 
several valid use cases and benefits, but this also gives users that don't know 
what they're doing to really mess things up. Some people will undoubtably think 
they *have* to define these properties in every connector, and then when they 
change their bootstrap URL in the worker they might forget it in their 
connector configs. Or, worse yet, having the internal topics in one cluster and 
the connector records in a different cluster will very much conflict with any 
hopes of adding EOS to source connectors.


was (Author: rhauch):
First of all, we're going to need a KIP for this, since it will change the 
configurations exposed to users. We need to make sure that this doesn't break 
backward compatibility, etc.

Second, I like this change and the ability to override some of the 
producer/consumer behavior. I also think that generally it's better to give 
users more control, but I'm actually very concerned about allowing connector 
configurations to override {{bootstrap servers}}. Yes, I can totally see the 
benefits, but this also gives users that don't know what they're doing to 
really mess things up. Some people will undoubtably think they *have* to define 
these properties in every connector, and then when they change their bootstrap 
URL in the worker they might forget it in their connector configs. Or, worse 
yet, having the internal topics in one cluster and the connector records in a 
different cluster will very much conflict with any hopes of adding EOS to 
source connectors.

> Add connector level configurability for producer/consumer client configs
> 
>
> Key: KAFKA-6890
> URL: https://issues.apache.org/jira/browse/KAFKA-6890
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Allen Tang
>Priority: Minor
>
> Right now, each source connector and sink connector inherit their client 
> configurations from the worker properties. Within the worker properties, all 
> configurations that have a prefix of "producer." or "consumer." are applied 
> to all source connectors and sink connectors respectively.
> We should also provide connector-level overrides whereby connector properties 
> that are prefixed with "producer." and "consumer." are used to feed into the 
> producer and consumer clients embedded within source and sink connectors 
> respectively. The prefixes will be removed via a String#substring() call, and 
> the remainder of the connector property key will be used as the client 
> configuration key. The value is fed directly to the client as the 
> configuration value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2018-11-01 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672075#comment-16672075
 ] 

Randall Hauch commented on KAFKA-7509:
--

Okay, I am now to the point where I think this "hygiene" approach is completely 
flawed, and that instead the problem is that we're logging as warnings all 
"unused" properties in the producer, consumer, and the KafkaAdminClient.

Let me back up. Based upon the discussion above, I modified my approach to 
attempt to retain all of the configuration properties that are known to the 
ProducerConfig, ConsumerConfig, and AdminClientConfig, where "known" properties 
include:
* all of those whose name is in the set returned by each of the config's 
configNames(), or
* any property that can be passed to an interceptor, key or value 
(de)serializer, metrics reporter, or partitioner that is instantiated by the 
client.

That last bit is the problem: the properties to the clients' interceptors, 
serdes, metrics reporter, and partitions are all unprefixed, so it is 
impossible to know which properties are needed by any of the specified 
implementations.

IOW, the properties passed to a producer, consumer, or admin client must be 
able to include any property that is needed by any of these custom components. 
And, because the getConfiguredComponent method used by the clients passes the 
Map to the component's configure method, the AbstractConfig 
doesn't know whether those properties are even used by the component. So, if 
the AbstractConfig doesn't really even know whether it a property is really 
used or unused, why are the Producer, Consumer, and KafkaAdminClient even 
bothering to log "unused" properties?

h3. Bottom line
I now posit that the only way to accurately eliminate these warnings is to 
remove the config.logUnused() call from the Producer, Consumer, and 
KafkaAdminClient, or to change AbstractConfig.logUnused() to log these at the 
INFO (or DEBUG) level.

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2018-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672080#comment-16672080
 ] 

ASF GitHub Bot commented on KAFKA-7509:
---

rhauch closed pull request #5802: KAFKA-7509: Reduce unnecessary and misleading 
“configuration supplied but not known” warning messages in Connect
URL: https://github.com/apache/kafka/pull/5802
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 058c491672a..ebd10d45689 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -185,6 +185,20 @@ protected AdminClientConfig(Map props, boolean 
doLog) {
 return CONFIG.names();
 }
 
+/**
+ * Return whether the given property name is a known configuration. This 
will consider valid any property that can be passed to
+ * instances of extensions, such as the {@link 
#METRIC_REPORTER_CLASSES_CONFIG metrics reporter}.
+ *
+ * @param name the property name
+ * @return true if the supplied name matches a known property, or false if 
it is unknown
+ */
+public static boolean isKnownConfig(String name) {
+if (name == null) {
+return false;
+}
+return configNames().contains(name) || 
name.startsWith(METRIC_REPORTER_CLASSES_CONFIG);
+}
+
 public static void main(String[] args) {
 System.out.println(CONFIG.toHtmlTable());
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ddd6e06c713..c62c574ad80 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -507,6 +507,22 @@ protected ConsumerConfig(Map props, boolean doLog) {
 return CONFIG.names();
 }
 
+/**
+ * Return whether the given property name is a known configuration. This 
will consider valid any property that can be passed to
+ * instances of extensions, such as the {@link 
#METRIC_REPORTER_CLASSES_CONFIG metrics reporter}.
+ *
+ * @param name the property name
+ * @return true if the supplied name matches a known property, or false if 
it is unknown
+ */
+public static boolean isKnownConfig(String name) {
+if (name == null) {
+return false;
+}
+return configNames().contains(name)
+   || name.startsWith(KEY_DESERIALIZER_CLASS_CONFIG) || 
name.startsWith(VALUE_DESERIALIZER_CLASS_CONFIG)
+   || name.startsWith(METRIC_REPORTER_CLASSES_CONFIG) || 
name.startsWith(INTERCEPTOR_CLASSES_CONFIG);
+}
+
 public static void main(String[] args) {
 System.out.println(CONFIG.toHtmlTable());
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 6142519c4dc..515a54888e2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -392,6 +392,23 @@ public ProducerConfig(Map props) {
 return CONFIG.names();
 }
 
+/**
+ * Return whether the given property name is a known configuration. This 
will consider valid any property that can be passed to
+ * instances of extensions, such as the {@link 
#METRIC_REPORTER_CLASSES_CONFIG metrics reporter}.
+ *
+ * @param name the property name
+ * @return true if the supplied name matches a known property, or false if 
it is unknown
+ */
+public static boolean isKnownConfig(String name) {
+if (name == null) {
+return false;
+}
+return configNames().contains(name)
+   || name.startsWith(KEY_SERIALIZER_CLASS_CONFIG) || 
name.startsWith(VALUE_SERIALIZER_CLASS_CONFIG)
+   || name.startsWith(METRIC_REPORTER_CLASSES_CONFIG) || 
name.startsWith(INTERCEPTOR_CLASSES_CONFIG)
+   || name.startsWith(PARTITIONER_CLASS_CONFIG);
+}
+
 public static void main(String[] args) {
 System.out.println(CONFIG.toHtmlTable());
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index df73a434d31..85a8d320d0c 100644
-

[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2018-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672089#comment-16672089
 ] 

ASF GitHub Bot commented on KAFKA-7509:
---

rhauch opened a new pull request #5867: KAFKA-7509: Logging unused configs as 
DEBUG rather than WARN
URL: https://github.com/apache/kafka/pull/5867
 
 
   The KafkaProducer, KafkaConsumer, and KafkaAdminClient all call 
`config.logUnused()` to log all of the unused configurations. These can be very 
misleading, because some conventional configuration properties for custom 
interceptors, key or value (de)serializers, metrics reporters, and partitioner 
are not actually defined by ConfigDefs and are thus all considered unused and 
logged as warnings.
   
   Also, some client applications (such as Connect) do not determine the subset 
of properties that can be passed to one of these clients, since there are these 
custom extensions. This causes extra properties to be logged as warnings, which 
is concerning for new users.
   
   These unused configurations should be logged at DEBUG level instead to 
reduce the number of unexpected warnings in many client application logs.
   
   ### Backporting
   This is a usability improvement that affects only this particular log 
message. As such, it may or may not be something that should be backported. 
Advice from committers would be appreciated.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2018-11-01 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-7509:
-
Component/s: clients

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6793) Unnecessary warning log message

2018-11-01 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672095#comment-16672095
 ] 

Randall Hauch commented on KAFKA-6793:
--

My latest proposed fix on KAFKA-7509 would fix this by making the unused config 
log messages as DEBUG rather than WARN.

> Unnecessary warning log message 
> 
>
> Key: KAFKA-6793
> URL: https://issues.apache.org/jira/browse/KAFKA-6793
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Anna O
>Priority: Minor
>
> When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log 
> started to appear:
> level: WARN
> logger: org.apache.kafka.clients.consumer.ConsumerConfig
> message: The configuration 'admin.retries' was supplied but isn't a known 
> config.
> The config is not explicitly supplied to the streams.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2018-11-01 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672097#comment-16672097
 ] 

Randall Hauch commented on KAFKA-7509:
--

The most recent [PR #5867|https://github.com/apache/kafka/pull/5867] would also 
fix KAFKA-6793 for Streams.

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7577:
---
Component/s: streams

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7578) Kafka streams: add possibility to choose multiple output topics

2018-11-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7578:
---
Labels: needs-kip  (was: )

> Kafka streams: add possibility to choose multiple output topics 
> 
>
> Key: KAFKA-7578
> URL: https://issues.apache.org/jira/browse/KAFKA-7578
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Taras Danylchuk
>Priority: Minor
>  Labels: needs-kip
>
> There is an awesome feature which was added in 2.0 kafka stream - possibility 
> to choose dynamically the output topic for topology, but in some cases it 
> could be useful to chose several topics withing the same cluster.
> Personally me - I met such case: I needed to route message based on its 
> content and by routes configuration to several topics.
> I've made a 'proposal' PR for this, unfortunately I couldn't find better way 
> to implement this:
> [https://github.com/apache/kafka/pull/5801]
> If this approach is OK, and improvement could be done in future versions, 
> please let me know and I'll finish PR code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7578) Kafka streams: add possibility to choose multiple output topics

2018-11-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7578:
---
Issue Type: New Feature  (was: Improvement)

> Kafka streams: add possibility to choose multiple output topics 
> 
>
> Key: KAFKA-7578
> URL: https://issues.apache.org/jira/browse/KAFKA-7578
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Taras Danylchuk
>Priority: Minor
>  Labels: needs-kip
>
> There is an awesome feature which was added in 2.0 kafka stream - possibility 
> to choose dynamically the output topic for topology, but in some cases it 
> could be useful to chose several topics withing the same cluster.
> Personally me - I met such case: I needed to route message based on its 
> content and by routes configuration to several topics.
> I've made a 'proposal' PR for this, unfortunately I couldn't find better way 
> to implement this:
> [https://github.com/apache/kafka/pull/5801]
> If this approach is OK, and improvement could be done in future versions, 
> please let me know and I'll finish PR code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7578) Kafka streams: add possibility to choose multiple output topics

2018-11-01 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672190#comment-16672190
 ] 

Matthias J. Sax commented on KAFKA-7578:


Thanks for creating this ticket. Note, that this is a public API change, and 
thus requires a KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]

Let us know if you have any questions regarding the KIP process.

> Kafka streams: add possibility to choose multiple output topics 
> 
>
> Key: KAFKA-7578
> URL: https://issues.apache.org/jira/browse/KAFKA-7578
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Taras Danylchuk
>Priority: Minor
>  Labels: needs-kip
>
> There is an awesome feature which was added in 2.0 kafka stream - possibility 
> to choose dynamically the output topic for topology, but in some cases it 
> could be useful to chose several topics withing the same cluster.
> Personally me - I met such case: I needed to route message based on its 
> content and by routes configuration to several topics.
> I've made a 'proposal' PR for this, unfortunately I couldn't find better way 
> to implement this:
> [https://github.com/apache/kafka/pull/5801]
> If this approach is OK, and improvement could be done in future versions, 
> please let me know and I'll finish PR code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-11-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7580:
---
Component/s: streams

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Blocker
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in the unit test code and then test for exception(needs code 
> changes in the unit tests):-
> root@p006vm18:/tmp# chattr +i /tmp/readOnlyDir/
> root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/grandparent
> mkdir: cannot create directory â/tmp/readOnlyDir/grandparentâ: Permission 
> denied
> A file with an immutable attr

[jira] [Updated] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-11-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7580:
---
Priority: Minor  (was: Blocker)

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in the unit test code and then test for exception(needs code 
> changes in the unit tests):-
> root@p006vm18:/tmp# chattr +i /tmp/readOnlyDir/
> root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/grandparent
> mkdir: cannot create directory â/tmp/readOnlyDir/grandparentâ: Permission 
> denied
> A file with an immut

[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-11-01 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672198#comment-16672198
 ] 

Matthias J. Sax commented on KAFKA-7580:


Thanks for reporting this. I am just wondering, why this is a problem. Running 
test as root user is not common...

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Blocker
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in the unit test code and then test for exception(needs code 
> changes in the unit tests):-
> root@p006vm18:/tmp# chattr +i /tmp/readOnlyDir/
> root@p006vm18:/tmp# mkdi

[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-01 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1667#comment-1667
 ] 

Matthias J. Sax commented on KAFKA-7577:


Can you describe your test data your are using and your ValueJoiner 
implementation? It's unclear to me what your setup is so we can reproduce the 
error.

Note, that a tomstone message is only produced, if there is anything to be 
deleted. Compare the semantics as describe in the wiki: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KTable-KTableJoin]
 Note, that there is no output for line 1,2, and 13, because the result table 
is empty in those cases and thus it's not required to send another tombstone 
(for line 13, pervious line 11/12, generated already a tombstone).

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-11-01 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672226#comment-16672226
 ] 

Matthias J. Sax commented on KAFKA-7192:


Yes, it's a client side bug and related to RocksDB. If you run with in-memory 
store, you won't see this issue.

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6793) Unnecessary warning log message

2018-11-01 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672238#comment-16672238
 ] 

Matthias J. Sax commented on KAFKA-6793:


Thanks [~rhauch]. It makes sense to have a unique strategy for the whole 
project. We should not fragment different components.

Should I comment on KAFKA-7509, of you do want to discuss it here?

> Unnecessary warning log message 
> 
>
> Key: KAFKA-6793
> URL: https://issues.apache.org/jira/browse/KAFKA-6793
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Anna O
>Priority: Minor
>
> When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log 
> started to appear:
> level: WARN
> logger: org.apache.kafka.clients.consumer.ConsumerConfig
> message: The configuration 'admin.retries' was supplied but isn't a known 
> config.
> The config is not explicitly supplied to the streams.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-01 Thread Daren Thomas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672245#comment-16672245
 ] 

Daren Thomas commented on KAFKA-7577:
-

{quote}Note, that a tombstone message is only produced, if there is anything to 
be deleted.
{quote}
My test case is set up to perform line 3 and line 7 from the data.  The table 
is not empty when the tombstone is sent.

The database tables backing the topics used by the stream processor look like 
this:
 * Table X with xid (primary key) and data A or null
 * Table Y with yid (primary key), xid, and zid
 * Table Z with zid (primary key) and data B or null

The topology looks like this:
 * Repartition Y using groupBy() and aggregate()
 * Repartition Z using groupBy() and aggregate()
 * Perform Y.leftJoin(Z).  Repartition using groupBy and aggregate()
 * Repartition X using flatMap()
 * Perform X.leftJoin(YZ), convert results to a stream, and send to the output 
topic.  Intended result is [A, B] for the value.

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7568) Return leader epoch in ListOffsets responses

2018-11-01 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7568.

   Resolution: Fixed
Fix Version/s: 2.2.0

> Return leader epoch in ListOffsets responses
> 
>
> Key: KAFKA-7568
> URL: https://issues.apache.org/jira/browse/KAFKA-7568
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.2.0
>
>
> This is part of KIP-320. The changes to the API have already been made, but 
> currently we return unknown epoch. We need to update the logic to search for 
> the epoch corresponding to a fetched offset in the leader epoch cache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7568) Return leader epoch in ListOffsets responses

2018-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672252#comment-16672252
 ] 

ASF GitHub Bot commented on KAFKA-7568:
---

hachikuji closed pull request #5855: KAFKA-7568; Return leader epoch in 
ListOffsets response
URL: https://github.com/apache/kafka/pull/5855
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 3537fc34bb0..d723ba091b7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -32,6 +32,8 @@
 import java.nio.channels.GatheringByteChannel;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -320,7 +322,8 @@ public TimestampAndOffset searchForTimestamp(long 
targetTimestamp, int startingP
 for (Record record : batch) {
 long timestamp = record.timestamp();
 if (timestamp >= targetTimestamp && record.offset() >= 
startingOffset)
-return new TimestampAndOffset(timestamp, 
record.offset());
+return new TimestampAndOffset(timestamp, 
record.offset(),
+
maybeLeaderEpoch(batch.partitionLeaderEpoch()));
 }
 }
 }
@@ -335,15 +338,23 @@ public TimestampAndOffset searchForTimestamp(long 
targetTimestamp, int startingP
 public TimestampAndOffset largestTimestampAfter(int startingPosition) {
 long maxTimestamp = RecordBatch.NO_TIMESTAMP;
 long offsetOfMaxTimestamp = -1L;
+int leaderEpochOfMaxTimestamp = RecordBatch.NO_PARTITION_LEADER_EPOCH;
 
 for (RecordBatch batch : batchesFrom(startingPosition)) {
 long timestamp = batch.maxTimestamp();
 if (timestamp > maxTimestamp) {
 maxTimestamp = timestamp;
 offsetOfMaxTimestamp = batch.lastOffset();
+leaderEpochOfMaxTimestamp = batch.partitionLeaderEpoch();
 }
 }
-return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp);
+return new TimestampAndOffset(maxTimestamp, offsetOfMaxTimestamp,
+maybeLeaderEpoch(leaderEpochOfMaxTimestamp));
+}
+
+private Optional maybeLeaderEpoch(int leaderEpoch) {
+return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
+Optional.empty() : Optional.of(leaderEpoch);
 }
 
 /**
@@ -492,28 +503,27 @@ public String toString() {
 public static class TimestampAndOffset {
 public final long timestamp;
 public final long offset;
+public final Optional leaderEpoch;
 
-public TimestampAndOffset(long timestamp, long offset) {
+public TimestampAndOffset(long timestamp, long offset, 
Optional leaderEpoch) {
 this.timestamp = timestamp;
 this.offset = offset;
+this.leaderEpoch = leaderEpoch;
 }
 
 @Override
 public boolean equals(Object o) {
 if (this == o) return true;
 if (o == null || getClass() != o.getClass()) return false;
-
 TimestampAndOffset that = (TimestampAndOffset) o;
-
-if (timestamp != that.timestamp) return false;
-return offset == that.offset;
+return timestamp == that.timestamp &&
+offset == that.offset &&
+Objects.equals(leaderEpoch, that.leaderEpoch);
 }
 
 @Override
 public int hashCode() {
-int result = (int) (timestamp ^ (timestamp >>> 32));
-result = 31 * result + (int) (offset ^ (offset >>> 32));
-return result;
+return Objects.hash(timestamp, offset, leaderEpoch);
 }
 
 @Override
@@ -521,6 +531,7 @@ public String toString() {
 return "TimestampAndOffset(" +
 "timestamp=" + timestamp +
 ", offset=" + offset +
+", leaderEpoch=" + leaderEpoch +
 ')';
 }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 637da9386e5..1945bccb613 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRec

[jira] [Commented] (KAFKA-7292) Converters should report their configuration options

2018-11-01 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672272#comment-16672272
 ] 

Randall Hauch commented on KAFKA-7292:
--

Connect's REST API also needs to use these ConfigDefs when validating a 
configuration or describing a connector's ConfigDef.

> Converters should report their configuration options
> 
>
> Key: KAFKA-7292
> URL: https://issues.apache.org/jira/browse/KAFKA-7292
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Priority: Major
>
> Converters do not support returning their configuration like Connectors and 
> Transformations do. Given this can be configured by an end user it should 
> also be reported via the API. 
> {code:java}
> public interface Converter {
>   void configure(Map var1, boolean var2);
>   byte[] fromConnectData(String var1, Schema var2, Object var3);
>   SchemaAndValue toConnectData(String var1, byte[] var2);
>   default ConfigDef config() {
> return new ConfigDef();
>   }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7292) Converters should report their configuration options

2018-11-01 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672272#comment-16672272
 ] 

Randall Hauch edited comment on KAFKA-7292 at 11/1/18 9:46 PM:
---

As mentioned above, once we change this then Connect's REST API also needs to 
use these ConfigDefs when validating a configuration or describing a 
connector's ConfigDef.


was (Author: rhauch):
Connect's REST API also needs to use these ConfigDefs when validating a 
configuration or describing a connector's ConfigDef.

> Converters should report their configuration options
> 
>
> Key: KAFKA-7292
> URL: https://issues.apache.org/jira/browse/KAFKA-7292
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Priority: Major
>
> Converters do not support returning their configuration like Connectors and 
> Transformations do. Given this can be configured by an end user it should 
> also be reported via the API. 
> {code:java}
> public interface Converter {
>   void configure(Map var1, boolean var2);
>   byte[] fromConnectData(String var1, Schema var2, Object var3);
>   SchemaAndValue toConnectData(String var1, byte[] var2);
>   default ConfigDef config() {
> return new ConfigDef();
>   }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7292) Converters should report their configuration options

2018-11-01 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-7292:
-
Affects Version/s: 2.1.0

> Converters should report their configuration options
> 
>
> Key: KAFKA-7292
> URL: https://issues.apache.org/jira/browse/KAFKA-7292
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Jeremy Custenborder
>Priority: Major
>  Labels: needs-kip
>
> Converters do not support returning their configuration like Connectors and 
> Transformations do. Given this can be configured by an end user it should 
> also be reported via the API. 
> {code:java}
> public interface Converter {
>   void configure(Map var1, boolean var2);
>   byte[] fromConnectData(String var1, Schema var2, Object var3);
>   SchemaAndValue toConnectData(String var1, byte[] var2);
>   default ConfigDef config() {
> return new ConfigDef();
>   }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7292) Converters should report their configuration options

2018-11-01 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-7292:
-
Affects Version/s: (was: 2.1.0)
   0.10.2.0

> Converters should report their configuration options
> 
>
> Key: KAFKA-7292
> URL: https://issues.apache.org/jira/browse/KAFKA-7292
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Jeremy Custenborder
>Priority: Major
>  Labels: needs-kip
>
> Converters do not support returning their configuration like Connectors and 
> Transformations do. Given this can be configured by an end user it should 
> also be reported via the API. 
> {code:java}
> public interface Converter {
>   void configure(Map var1, boolean var2);
>   byte[] fromConnectData(String var1, Schema var2, Object var3);
>   SchemaAndValue toConnectData(String var1, byte[] var2);
>   default ConfigDef config() {
> return new ConfigDef();
>   }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7292) Converters should report their configuration options

2018-11-01 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-7292:
-
Labels: needs-kip  (was: )

> Converters should report their configuration options
> 
>
> Key: KAFKA-7292
> URL: https://issues.apache.org/jira/browse/KAFKA-7292
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Jeremy Custenborder
>Priority: Major
>  Labels: needs-kip
>
> Converters do not support returning their configuration like Connectors and 
> Transformations do. Given this can be configured by an end user it should 
> also be reported via the API. 
> {code:java}
> public interface Converter {
>   void configure(Map var1, boolean var2);
>   byte[] fromConnectData(String var1, Schema var2, Object var3);
>   SchemaAndValue toConnectData(String var1, byte[] var2);
>   default ConfigDef config() {
> return new ConfigDef();
>   }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7583) Producer force shutdown doesn't work when all brokers is down.

2018-11-01 Thread Vitalina Horyukova (JIRA)
Vitalina Horyukova created KAFKA-7583:
-

 Summary: Producer force shutdown doesn't work when all brokers is 
down.
 Key: KAFKA-7583
 URL: https://issues.apache.org/jira/browse/KAFKA-7583
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.11.0.0
 Environment: {code}
uname -a
Linux kassa 3.13.0-139-generic #188-Ubuntu SMP Tue Jan 9 14:43:09 UTC 2018 
x86_64 x86_64 x86_64 GNU/Linux
java --version
java 1.8.0_152
{code}
Reporter: Vitalina Horyukova


Hi!
When all Kafka brokers are down, thread which called {{KafkaProducer.close}} 
the infinity stucks in second join to {{KafkaProducer.ioThread}}, because 
{{KafkaProducer.ioThread}} infinity spins over {{while}} cycle in 
{{Sender.maybeWaitForProducerId}}. The root cause of this is that 
{{Sender.awaitLeastLoadedNodeReady}} -> {{NetworkClientUtils.awaitReady}} 
throws {{IOException}} every iteration.
In logs you can see infinity repeation of this part every `retry.backoff.ms`:
{code:java}
[2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: -1 
rack: null)
[2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: -1 
rack: null)
[2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Initialize connection to node -1 for 
sending metadata request
[2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Initiating connection to node -1 at 
kafka:9093.
[2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [Selector] Connection with kafka/xxx.xxx.xxx.xxx 
disconnected
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:109)
at 
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:359)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
at 
org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:39)
at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:62)
at 
org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:748)
[2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Node -1 disconnected.
[2018-11-01T16:19:47.585+03:00] WARN  [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Connection to node -1 could not be 
established. Broker may not be available.
[2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [Sender] Broker {} disconnected while awaiting 
InitProducerId response
java.io.IOException: Connection to kafka:9093 (id: -1 rack: null) failed.
at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68)
at 
org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:748)
[2018-11-01T16:19:47.585+03:00] TRACE [kafka-producer-network-thread | 
producer-1] [] [] [] [Sender] Retry InitProducerIdRequest in 100ms.
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7583) Producer force shutdown doesn't work when all brokers is down.

2018-11-01 Thread Vitalina Horyukova (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitalina Horyukova updated KAFKA-7583:
--
Environment: 
{code}
uname -a
Linux xxx 3.13.0-139-generic #188-Ubuntu SMP Tue Jan 9 14:43:09 UTC 2018 x86_64 
x86_64 x86_64 GNU/Linux
java --version
java 1.8.0_152
{code}

  was:
{code}
uname -a
Linux kassa 3.13.0-139-generic #188-Ubuntu SMP Tue Jan 9 14:43:09 UTC 2018 
x86_64 x86_64 x86_64 GNU/Linux
java --version
java 1.8.0_152
{code}


> Producer force shutdown doesn't work when all brokers is down.
> --
>
> Key: KAFKA-7583
> URL: https://issues.apache.org/jira/browse/KAFKA-7583
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.0
> Environment: {code}
> uname -a
> Linux xxx 3.13.0-139-generic #188-Ubuntu SMP Tue Jan 9 14:43:09 UTC 2018 
> x86_64 x86_64 x86_64 GNU/Linux
> java --version
> java 1.8.0_152
> {code}
>Reporter: Vitalina Horyukova
>Priority: Major
>
> Hi!
> When all Kafka brokers are down, thread which called {{KafkaProducer.close}} 
> the infinity stucks in second join to {{KafkaProducer.ioThread}}, because 
> {{KafkaProducer.ioThread}} infinity spins over {{while}} cycle in 
> {{Sender.maybeWaitForProducerId}}. The root cause of this is that 
> {{Sender.awaitLeastLoadedNodeReady}} -> {{NetworkClientUtils.awaitReady}} 
> throws {{IOException}} every iteration.
> In logs you can see infinity repeation of this part every `retry.backoff.ms`:
> {code:java}
> [2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: 
> -1 rack: null)
> [2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: 
> -1 rack: null)
> [2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Initialize connection to node -1 for 
> sending metadata request
> [2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Initiating connection to node -1 at 
> kafka:9093.
> [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [Selector] Connection with kafka/xxx.xxx.xxx.xxx 
> disconnected
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:109)
>   at 
> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:359)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
>   at 
> org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:39)
>   at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:62)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>   at java.lang.Thread.run(Thread.java:748)
> [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Node -1 disconnected.
> [2018-11-01T16:19:47.585+03:00] WARN  [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Connection to node -1 could not be 
> established. Broker may not be available.
> [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [Sender] Broker {} disconnected while awaiting 
> InitProducerId response
> java.io.IOException: Connection to kafka:9093 (id: -1 rack: null) failed.
>   at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>   at java.lang.Thread.run(Thread.java:748)
> [2018-11-01T16:19:47.585+03:00] TRACE [kafka-producer-network-thread | 
> produ

[jira] [Updated] (KAFKA-7583) Producer force shutdown doesn't work when all brokers is down.

2018-11-01 Thread Vitalina Horyukova (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitalina Horyukova updated KAFKA-7583:
--
Environment: 
Linux xxx 3.13.0-139-generic #188-Ubuntu SMP Tue Jan 9 14:43:09 UTC 2018 x86_64 
x86_64 x86_64 GNU/Linux

java 1.8.0_152


  was:
{code}
uname -a
Linux xxx 3.13.0-139-generic #188-Ubuntu SMP Tue Jan 9 14:43:09 UTC 2018 x86_64 
x86_64 x86_64 GNU/Linux
java --version
java 1.8.0_152
{code}


> Producer force shutdown doesn't work when all brokers is down.
> --
>
> Key: KAFKA-7583
> URL: https://issues.apache.org/jira/browse/KAFKA-7583
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.0
> Environment: Linux xxx 3.13.0-139-generic #188-Ubuntu SMP Tue Jan 9 
> 14:43:09 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
> java 1.8.0_152
>Reporter: Vitalina Horyukova
>Priority: Major
>
> Hi!
> When all Kafka brokers are down, thread which called {{KafkaProducer.close}} 
> the infinity stucks in second join to {{KafkaProducer.ioThread}}, because 
> {{KafkaProducer.ioThread}} infinity spins over {{while}} cycle in 
> {{Sender.maybeWaitForProducerId}}. The root cause of this is that 
> {{Sender.awaitLeastLoadedNodeReady}} -> {{NetworkClientUtils.awaitReady}} 
> throws {{IOException}} every iteration.
> In logs you can see infinity repeation of this part every `retry.backoff.ms`:
> {code:java}
> [2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: 
> -1 rack: null)
> [2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: 
> -1 rack: null)
> [2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Initialize connection to node -1 for 
> sending metadata request
> [2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Initiating connection to node -1 at 
> kafka:9093.
> [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [Selector] Connection with kafka/xxx.xxx.xxx.xxx 
> disconnected
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:109)
>   at 
> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:359)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
>   at 
> org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:39)
>   at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:62)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>   at java.lang.Thread.run(Thread.java:748)
> [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Node -1 disconnected.
> [2018-11-01T16:19:47.585+03:00] WARN  [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Connection to node -1 could not be 
> established. Broker may not be available.
> [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [Sender] Broker {} disconnected while awaiting 
> InitProducerId response
> java.io.IOException: Connection to kafka:9093 (id: -1 rack: null) failed.
>   at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>   at java.lang.Thread.run(Thread.java:748)
> [2018-11-01T16:19:47.585+03:00] TRACE [kafka-producer-network-thread | 
> producer-1] [] [] [] [Sender] Retry InitProducerIdRequest in 100ms.
> {code}



--
This m

[jira] [Updated] (KAFKA-7583) Producer force close doesn't work when all brokers is down.

2018-11-01 Thread Vitalina Horyukova (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitalina Horyukova updated KAFKA-7583:
--
Summary: Producer force close doesn't work when all brokers is down.  (was: 
Producer force shutdown doesn't work when all brokers is down.)

> Producer force close doesn't work when all brokers is down.
> ---
>
> Key: KAFKA-7583
> URL: https://issues.apache.org/jira/browse/KAFKA-7583
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.11.0.0
> Environment: Linux xxx 3.13.0-139-generic #188-Ubuntu SMP Tue Jan 9 
> 14:43:09 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
> java 1.8.0_152
>Reporter: Vitalina Horyukova
>Priority: Major
>
> Hi!
> When all Kafka brokers are down, thread which called {{KafkaProducer.close}} 
> the infinity stucks in second join to {{KafkaProducer.ioThread}}, because 
> {{KafkaProducer.ioThread}} infinity spins over {{while}} cycle in 
> {{Sender.maybeWaitForProducerId}}. The root cause of this is that 
> {{Sender.awaitLeastLoadedNodeReady}} -> {{NetworkClientUtils.awaitReady}} 
> throws {{IOException}} every iteration.
> In logs you can see infinity repeation of this part every `retry.backoff.ms`:
> {code:java}
> [2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: 
> -1 rack: null)
> [2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: 
> -1 rack: null)
> [2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Initialize connection to node -1 for 
> sending metadata request
> [2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Initiating connection to node -1 at 
> kafka:9093.
> [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [Selector] Connection with kafka/xxx.xxx.xxx.xxx 
> disconnected
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:109)
>   at 
> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:359)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
>   at 
> org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:39)
>   at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:62)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>   at java.lang.Thread.run(Thread.java:748)
> [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Node -1 disconnected.
> [2018-11-01T16:19:47.585+03:00] WARN  [kafka-producer-network-thread | 
> producer-1] [] [] [] [NetworkClient] Connection to node -1 could not be 
> established. Broker may not be available.
> [2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
> producer-1] [] [] [] [Sender] Broker {} disconnected while awaiting 
> InitProducerId response
> java.io.IOException: Connection to kafka:9093 (id: -1 rack: null) failed.
>   at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>   at java.lang.Thread.run(Thread.java:748)
> [2018-11-01T16:19:47.585+03:00] TRACE [kafka-producer-network-thread | 
> producer-1] [] [] [] [Sender] Retry InitProducerIdRequest in 100ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7583) Producer force close doesn't work when all brokers is down.

2018-11-01 Thread Vitalina Horyukova (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-7583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vitalina Horyukova updated KAFKA-7583:
--
Description: 
Hi!
When all Kafka brokers are down, thread which called {{KafkaProducer.close}} 
the infinity stucks in second join to {{KafkaProducer.ioThread}}, because 
{{KafkaProducer.ioThread}} infinity spins over {{while}} cycle in 
{{Sender.maybeWaitForProducerId}}. The root cause of this is that 
{{Sender.awaitLeastLoadedNodeReady}} -> {{NetworkClientUtils.awaitReady}} 
throws {{IOException}} every iteration.
In logs you can see infinity repeation of this part every {{retry.backoff.ms}}:
{code:java}
[2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: -1 
rack: null)
[2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: -1 
rack: null)
[2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Initialize connection to node -1 for 
sending metadata request
[2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Initiating connection to node -1 at 
kafka:9093.
[2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [Selector] Connection with kafka/xxx.xxx.xxx.xxx 
disconnected
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
org.apache.kafka.common.network.SslTransportLayer.finishConnect(SslTransportLayer.java:109)
at 
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:359)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
at 
org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:39)
at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:62)
at 
org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:748)
[2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Node -1 disconnected.
[2018-11-01T16:19:47.585+03:00] WARN  [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Connection to node -1 could not be 
established. Broker may not be available.
[2018-11-01T16:19:47.585+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [Sender] Broker {} disconnected while awaiting 
InitProducerId response
java.io.IOException: Connection to kafka:9093 (id: -1 rack: null) failed.
at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:68)
at 
org.apache.kafka.clients.producer.internals.Sender.awaitLeastLoadedNodeReady(Sender.java:409)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:418)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:203)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:748)
[2018-11-01T16:19:47.585+03:00] TRACE [kafka-producer-network-thread | 
producer-1] [] [] [] [Sender] Retry InitProducerIdRequest in 100ms.
{code}

  was:
Hi!
When all Kafka brokers are down, thread which called {{KafkaProducer.close}} 
the infinity stucks in second join to {{KafkaProducer.ioThread}}, because 
{{KafkaProducer.ioThread}} infinity spins over {{while}} cycle in 
{{Sender.maybeWaitForProducerId}}. The root cause of this is that 
{{Sender.awaitLeastLoadedNodeReady}} -> {{NetworkClientUtils.awaitReady}} 
throws {{IOException}} every iteration.
In logs you can see infinity repeation of this part every `retry.backoff.ms`:
{code:java}
[2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: -1 
rack: null)
[2018-11-01T16:19:47.583+03:00] TRACE [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Found least loaded node kafka:9093 (id: -1 
rack: null)
[2018-11-01T16:19:47.583+03:00] DEBUG [kafka-producer-network-thread | 
producer-1] [] [] [] [NetworkClient] Initialize connection to node -1 for 
se

[jira] [Commented] (KAFKA-7537) Only include live brokers in the UpdateMetadataRequest sent to existing brokers if there is no change in the partition states

2018-11-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672309#comment-16672309
 ] 

ASF GitHub Bot commented on KAFKA-7537:
---

hzxa21 opened a new pull request #5869:  KAFKA-7537: Avoid sending full 
UpdateMetadataRequest to existing brokers in the cluster on broker changes to 
reduce controller memory footprint
URL: https://github.com/apache/kafka/pull/5869
 
 
   Currently when brokers join/leave the cluster without any partition states 
changes, controller will send out UpdateMetadataRequests containing the states 
of all partitions to all brokers. But for existing brokers in the cluster, the 
metadata diff between controller and the broker should only be the 
"live_brokers"/"offline_replicas" info. Only the brokers with empty metadata 
cache need the full UpdateMetadataRequest. Sending the full 
UpdateMetadataRequest to all brokers can place nonnegligible memory pressure on 
the controller side, especially for large clusters with many brokers and a 
large number of partitions.
   
   Let's say in total we have N brokers, M partitions in the cluster and we 
want to add 1 brand new broker in the cluster. With RF=2, the memory footprint 
per partition in the UpdateMetadataRequest is ~200 Bytes. In the current 
controller implementation, if each of the N RequestSendThreads serializes and 
sends out the UpdateMetadataRequest at roughly the same time (which is very 
likely the case), we will end up using (N+1)*M*200B memory. However, we only 
need to send out one full UpdateMetadataReuqest in this case. More detail can 
be found in the jira ticket KAFKA-7537.
   
   This PR avoids sending out full UpdateMetadataReuqest in the following 
scenarios:
   1. On broker startup, send out full UpdateMetadataRequest to newly added 
brokers and only send out UpdateMetadataReuqest with empty partition states to 
existing brokers.
   2. On broker failure, if it doesn't require leader election, only include 
the states of partitions that are hosted by the dead broker(s) in the 
UpdateMetadataReuqest instead of including all partition states.
   
   _Note that after partition state change and replica state change, 
UpdateMetadataReuqests still need to be sent and this behavior remains 
unchanged after this PR._
   
   This PR also introduces a minor optimization in the MetadataCache update to 
avoid copying the previous partition states upon receiving 
UpdateMetadataRequest with no partition states.
   
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Only include live brokers in the UpdateMetadataRequest sent to existing 
> brokers if there is no change in the partition states
> -
>
> Key: KAFKA-7537
> URL: https://issues.apache.org/jira/browse/KAFKA-7537
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> Currently if when brokers join/leave the cluster without any partition states 
> changes, controller will send out UpdateMetadataRequests containing the 
> states of all partitions to all brokers. But for existing brokers in the 
> cluster, the metadata diff between controller and the broker should only be 
> the "live_brokers" info. Only the brokers with empty metadata cache need the 
> full UpdateMetadataRequest. Sending the full UpdateMetadataRequest to all 
> brokers can place nonnegligible memory pressure on the controller side.
> Let's say in total we have N brokers, M partitions in the cluster and we want 
> to add 1 brand new broker in the cluster. With RF=2, the memory footprint per 
> partition in the UpdateMetadataRequest is ~200 Bytes. In the current 
> controller implementation, if each of the N RequestSendThreads serializes and 
> sends out the UpdateMetadataRequest at roughly the same time (which is very 
> likely the case), we will end up using *(N+1)*M*200B*. In a large kafka 
> cluster, we can have:
> {noformat}
> N=99
> M=100k
> Memory usage to send out UpdateMetadataRequest to all brokers:
> 100 * 100K * 200B = 2G
> However, we only need to send out full UpdateMetadataRequest to the newly 
> added broker. We only need to include live broker ids (4B * 100 broker

[jira] [Commented] (KAFKA-6793) Unnecessary warning log message

2018-11-01 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672335#comment-16672335
 ] 

Randall Hauch commented on KAFKA-6793:
--

[~mjsax], I guess I'd appreciate a comment on KAFKA-7509, since that's had most 
of the discussion so far.

> Unnecessary warning log message 
> 
>
> Key: KAFKA-6793
> URL: https://issues.apache.org/jira/browse/KAFKA-6793
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Anna O
>Priority: Minor
>
> When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log 
> started to appear:
> level: WARN
> logger: org.apache.kafka.clients.consumer.ConsumerConfig
> message: The configuration 'admin.retries' was supplied but isn't a known 
> config.
> The config is not explicitly supplied to the streams.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7577) Semantics of Table-Table Join with Null Message Are Incorrect

2018-11-01 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672341#comment-16672341
 ] 

Matthias J. Sax commented on KAFKA-7577:


Thanks for clarification. Still try to understand the scenario. For regular 
left join, we have test and those pass and thus leftjoin itself seems to be 
correct 
([https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java).]
 Just trying to figure out what the issue could be. There are two other bug 
reports about KTable joins, but they seem to be different 
(https://issues.apache.org/jira/browse/KAFKA-4609 and 
https://issues.apache.org/jira/browse/KAFKA-6599).

What I am wondering: if you do Y.leftJoin(Z), it is only guaranteed that the Y 
input will be in the output, while your example shows the B comes from input Z. 
Can you verify that Y.leftJoin(Z) produces an output record that contains B ?

Can you reproduce the issue using `TopologyTestDriver`? This would be most 
helpful to track down the root cause. Thanks a lot for your help!

> Semantics of Table-Table Join with Null Message Are Incorrect
> -
>
> Key: KAFKA-7577
> URL: https://issues.apache.org/jira/browse/KAFKA-7577
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Daren Thomas
>Priority: Major
>
> Observed behavior of Table-Table join with a Null Message does not match the 
> semantics described in the documentation 
> ([https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#ktable-ktable-join).]
>   The expectation is:
>  * Message A results in [A, null] from the Left Join
>  * Message null (tombstone) results in null (tombstone) from the Left Join
>  The observed behavior was that the null (tombstone) message did not pass 
> through the Left Join to the output topic like expected.  This behavior was 
> observed with and without caching enabled, against a test harness, and 
> against a local Confluent 5.0.0 platform.  It was also observed that the 
> KTableKTableLeftJoinProcessor.process() function was not called for the null 
> (tombstone) message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations

2018-11-01 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672356#comment-16672356
 ] 

Matthias J. Sax commented on KAFKA-7509:


While logging in DEBUG level would prevent miss leading WARN log entries, it 
seems to defeat the purpose, that is to guard against typing mistakes in 
properties files etc. (ie, when plain strings instead of predefined const 
values are used). Thus, I am not 100% convinced of this solution.

Thinking about this, we could instead define a prefix, that is use for all 
configs that should be passed through. For all configs with this prefix, no 
name check is done, and the prefix is stripped before it's passed into the 
internal call to `configure()`. For example, Kafka Streams would not set 
`admin.retries`, but `passthrough.admin.retries` avoiding the WARN log. (I hope 
we find a better name than "passthrough" :) )

This would be a public API change though and require a KIP. Thoughts?

> Kafka Connect logs unnecessary warnings about unused configurations
> ---
>
> Key: KAFKA-7509
> URL: https://issues.apache.org/jira/browse/KAFKA-7509
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>
> When running Connect, the logs contain quite a few warnings about "The 
> configuration '{}' was supplied but isn't a known config." This occurs when 
> Connect creates producers, consumers, and admin clients, because the 
> AbstractConfig is logging unused configuration properties upon construction. 
> It's complicated by the fact that the Producer, Consumer, and AdminClient all 
> create their own AbstractConfig instances within the constructor, so we can't 
> even call its {{ignore(String key)}} method.
> See also KAFKA-6793 for a similar issue with Streams.
> There are no arguments in the Producer, Consumer, or AdminClient constructors 
> to control  whether the configs log these warnings, so a simpler workaround 
> is to only pass those configuration properties to the Producer, Consumer, and 
> AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig 
> configdefs know about.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-01 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672410#comment-16672410
 ] 

Jason Gustafson commented on KAFKA-7531:


[~spuzon] Can you post the git commit id for the broker? It should be in the 
log when the server starts up. The reason I ask is that the line numbers in the 
trace do not appear to match up the code in 2.0.0. 

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thr

[jira] [Comment Edited] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-11-01 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672410#comment-16672410
 ] 

Jason Gustafson edited comment on KAFKA-7531 at 11/2/18 12:43 AM:
--

[~spuzon] Can you post the git commit id for the broker? It should be in the 
log when the server starts up. The reason I ask is that the line numbers in the 
trace do not appear to match the code in 2.0.0. 


was (Author: hachikuji):
[~spuzon] Can you post the git commit id for the broker? It should be in the 
log when the server starts up. The reason I ask is that the line numbers in the 
trace do not appear to match up the code in 2.0.0. 

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.

[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-11-01 Thread Sarvesh Tamba (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16672562#comment-16672562
 ] 

Sarvesh Tamba commented on KAFKA-7580:
--

For our testing validation purposes, we need to validate the tests as root and 
non-root user. Hence the requirement for running test as root user. Were the 
unit tests intended to be run as non-root user only, and failure of the unit 
test when run as root user is expected behaviour?

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in t