[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2015-03-16 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363051#comment-14363051
 ] 

Honghai Chen commented on KAFKA-1646:
-

Hey [~jkreps] Is it ok to add one configuration like "log.preallocatefile" to 
the configuration and change the three places of "if Os.IsWindows" to check the 
configuration?

> Improve consumer read performance for Windows
> -
>
> Key: KAFKA-1646
> URL: https://issues.apache.org/jira/browse/KAFKA-1646
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.1.1
> Environment: Windows
>Reporter: xueqiang wang
>Assignee: xueqiang wang
>  Labels: newbie, patch
> Attachments: Improve consumer read performance for Windows.patch, 
> KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
> KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, 
> KAFKA-1646_20150312_200352.patch
>
>
> This patch is for Window platform only. In Windows platform, if there are 
> more than one replicas writing to disk, the segment log files will not be 
> consistent in disk and then consumer reading performance will be dropped down 
> greatly. This fix allocates more disk spaces when rolling a new segment, and 
> then it will improve the consumer reading performance in NTFS file system.
> This patch doesn't affect file allocation of other filesystems, for it only 
> adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


java.net.BindException: Address already in use

2015-03-16 Thread Tong Li

Hi guys, when I ran test, I got a lot of these exceptions. I wonder if you
had similar problems running the tests and how resolved the issue.

Thanks.

Tong Li
OpenStack & Kafka Community Development
Building 501/B205
liton...@us.ibm.com

[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled

2015-03-16 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363129#comment-14363129
 ] 

Dmitry Bugaychenko commented on KAFKA-1305:
---

Even with a fast dedicated channel there will be a race condition in switching 
leadership. It could be removed either by complicating the protocol (eg. the 
new leader shoul take leadership only after getting "not a leader" respone in 
fetcher thread from the old one, while the old leader should stop handling 
produce request allowing fetches only from the new leader untill it gets 
everything), or, may be, it is worth to consider getting rid of controller in 
partition leader election and use distributed elections in ZK.

> Controller can hang on controlled shutdown with auto leader balance enabled
> ---
>
> Key: KAFKA-1305
> URL: https://issues.apache.org/jira/browse/KAFKA-1305
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Sriharsha Chintalapani
>Priority: Blocker
> Fix For: 0.8.2.0, 0.9.0
>
> Attachments: KAFKA-1305.patch, KAFKA-1305.patch, 
> KAFKA-1305_2014-10-13_07:30:45.patch
>
>
> This is relatively easy to reproduce especially when doing a rolling bounce.
> What happened here is as follows:
> 1. The previous controller was bounced and broker 265 became the new 
> controller.
> 2. I went on to do a controlled shutdown of broker 265 (the new controller).
> 3. In the mean time the automatically scheduled preferred replica leader 
> election process started doing its thing and starts sending 
> LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).  
> (t@113 below).
> 4. While that's happening, the controlled shutdown process on 265 succeeds 
> and proceeds to deregister itself from ZooKeeper and shuts down the socket 
> server.
> 5. (ReplicaStateMachine actually removes deregistered brokers from the 
> controller channel manager's list of brokers to send requests to.  However, 
> that removal cannot take place (t@18 below) because preferred replica leader 
> election task owns the controller lock.)
> 6. So the request thread to broker 265 gets into infinite retries.
> 7. The entire broker shutdown process is blocked on controller shutdown for 
> the same reason (it needs to acquire the controller lock).
> Relevant portions from the thread-dump:
> "Controller-265-to-broker-265-send-thread" - Thread t@113
>java.lang.Thread.State: TIMED_WAITING
>   at java.lang.Thread.sleep(Native Method)
>   at 
> kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
>   at kafka.utils.Utils$.swallow(Utils.scala:167)
>   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
>   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
>   at kafka.utils.Logging$class.swallow(Logging.scala:94)
>   at kafka.utils.Utils$.swallow(Utils.scala:46)
>   at 
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
>   at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
>   - locked java.lang.Object@6dbf14a7
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>Locked ownable synchronizers:
>   - None
> ...
> "Thread-4" - Thread t@17
>java.lang.Thread.State: WAITING on 
> java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: 
> kafka-scheduler-0
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>   at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>   at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>   at kafka.utils.Utils$.inLock(Utils.scala:536)
>   at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
>   at 
> kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
>   at kafka.utils.Utils$.swallow(Utils.scala:167)
>   at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
>   at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
>   at kafka.utils.Logging$class.swallow(Logging.scala:94)
>   at kafka.utils.Utils$.swallow(Utils.scala:46)
>   at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
>   at 
> kafka.server.KafkaServerStartable.shutdown(KafkaServerS

[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-03-16 Thread Tong Li (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363164#comment-14363164
 ] 

Tong Li commented on KAFKA-1926:


@Jun Rao, awesome comments. I will be following the directions and provide new 
patch set. This also confirms the direction that I am going. Thanks.

> Replace kafka.utils.Utils with o.a.k.common.utils.Utils
> ---
>
> Key: KAFKA-1926
> URL: https://issues.apache.org/jira/browse/KAFKA-1926
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>  Labels: newbie, patch
> Attachments: KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch
>
>
> There is currently a lot of duplication between the Utils class in common and 
> the one in core.
> Our plan has been to deprecate duplicate code in the server and replace it 
> with the new common code.
> As such we should evaluate each method in the scala Utils and do one of the 
> following:
> 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose 
> utility in active use that is not Kafka-specific. If we migrate it we should 
> really think about the API and make sure there is some test coverage. A few 
> things in there are kind of funky and we shouldn't just blindly copy them 
> over.
> 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold 
> any utilities that really need to make use of Scala features to be convenient.
> 3. Delete it if it is not used, or has a bad api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2021) Consolidate test classes for KafkaConfig

2015-03-16 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-2021:
---

 Summary: Consolidate test classes for KafkaConfig
 Key: KAFKA-2021
 URL: https://issues.apache.org/jira/browse/KAFKA-2021
 Project: Kafka
  Issue Type: Task
Reporter: Gwen Shapira
Priority: Minor


We have kafka.server.KafkaConfigTest, KafkaConfigConfigDefTest and 
kafka.unit.KafkaTest (in a file called KafkaConfigTest.scala)

I think consolidating them into one test class (or at list renaming so it will 
be clear how they are different) will make a lot of sense.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: java.net.BindException: Address already in use

2015-03-16 Thread Jun Rao
This is being tracked in KAFKA-1501. Typically, this won't happen on a
dedicated machine.

Thanks,

Jun

On Mon, Mar 16, 2015 at 5:02 AM, Tong Li  wrote:

>
> Hi guys, when I ran test, I got a lot of these exceptions. I wonder if you
> had similar problems running the tests and how resolved the issue.
>
> Thanks.
>
> Tong Li
> OpenStack & Kafka Community Development
> Building 501/B205
> liton...@us.ibm.com


[jira] [Commented] (KAFKA-1994) Evaluate performance effect of chroot check on Topic creation

2015-03-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363349#comment-14363349
 ] 

Gwen Shapira commented on KAFKA-1994:
-

New patch looks good. 
[~singhashish], can you share how does the performance of the code with new 
patch compares to that of the older solution and to that of createPersistent() 
without any of the checks?

> Evaluate performance effect of chroot check on Topic creation
> -
>
> Key: KAFKA-1994
> URL: https://issues.apache.org/jira/browse/KAFKA-1994
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
> Attachments: KAFKA-1994.patch, KAFKA-1994_2015-03-03_18:19:45.patch
>
>
> KAFKA-1664 adds check for chroot while creating a node in ZK. ZkPath checks 
> if namespace exists before trying to create a path in ZK. This raises a 
> concern that checking namespace for each path creation might be unnecessary 
> and can potentially make creations expensive.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Question about concurrency during Log config change

2015-03-16 Thread Andrii Biletskyi
Hi all,

I was looking through the code related to "dynamic Log config change"
feature and
noticed the way we deal with concurrency there. I have a question about it.

The Log class holds volatile LogConfig property, almost all methods in
Log.scala are
synchronized on private lock object. But the code in TopicConfigManager
(
https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108
)
which "substitutes" Log's logConfig is not synchronized.

Code execution example:
Thread 1: Log.append -> Log:288 config.*maxMessageSize* is accessed
https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288

Thread 2: handles log config change -> TopicConfigManager:108 (see above)
substitutes
log's config - changes *maxMessageSize* and *segmentSize*

Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and
pickups updated
config setting
https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299

So looks like we accessed object in partial "state" - in scope of one
procedure
(Log.append) we took one setting from the old state (maxMessageSize), and
the other
one from the updated state.

Methods in Log are synchronized, as mentioned above. But logConfig is only
volatile
which solves visibility problems but doesn't prevent it from being changed
in other
thread, as I understand.

Am I missing something here?

Thanks,
Andrii Biletskyi


Re: Question about concurrency during Log config change

2015-03-16 Thread Jay Kreps
You are correct. Each read will be a valid value but there is no guarantee
that subsequent reads will read from the same config. I don't think that is
a problem, do you? If we want to strengthen the guarantee we can grab the
config once in the method
   val config = log.config
and then do however many accesses against that variable which will remain
constant even if the config is updated in the course of the method.

-Jay

On Mon, Mar 16, 2015 at 8:50 AM, Andrii Biletskyi <
andrii.bilets...@stealth.ly> wrote:

> Hi all,
>
> I was looking through the code related to "dynamic Log config change"
> feature and
> noticed the way we deal with concurrency there. I have a question about it.
>
> The Log class holds volatile LogConfig property, almost all methods in
> Log.scala are
> synchronized on private lock object. But the code in TopicConfigManager
> (
>
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108
> )
> which "substitutes" Log's logConfig is not synchronized.
>
> Code execution example:
> Thread 1: Log.append -> Log:288 config.*maxMessageSize* is accessed
>
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288
>
> Thread 2: handles log config change -> TopicConfigManager:108 (see above)
> substitutes
> log's config - changes *maxMessageSize* and *segmentSize*
>
> Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and
> pickups updated
> config setting
>
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299
>
> So looks like we accessed object in partial "state" - in scope of one
> procedure
> (Log.append) we took one setting from the old state (maxMessageSize), and
> the other
> one from the updated state.
>
> Methods in Log are synchronized, as mentioned above. But logConfig is only
> volatile
> which solves visibility problems but doesn't prevent it from being changed
> in other
> thread, as I understand.
>
> Am I missing something here?
>
> Thanks,
> Andrii Biletskyi
>


[jira] [Assigned] (KAFKA-2021) Consolidate test classes for KafkaConfig

2015-03-16 Thread Andrii Biletskyi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrii Biletskyi reassigned KAFKA-2021:
---

Assignee: Andrii Biletskyi

> Consolidate test classes for KafkaConfig
> 
>
> Key: KAFKA-2021
> URL: https://issues.apache.org/jira/browse/KAFKA-2021
> Project: Kafka
>  Issue Type: Task
>Reporter: Gwen Shapira
>Assignee: Andrii Biletskyi
>Priority: Minor
>
> We have kafka.server.KafkaConfigTest, KafkaConfigConfigDefTest and 
> kafka.unit.KafkaTest (in a file called KafkaConfigTest.scala)
> I think consolidating them into one test class (or at list renaming so it 
> will be clear how they are different) will make a lot of sense.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-03-16 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1809:

Attachment: KAFKA-1809_2015-03-16_09:02:18.patch

> Refactor brokers to allow listening on multiple ports and IPs 
> --
>
> Key: KAFKA-1809
> URL: https://issues.apache.org/jira/browse/KAFKA-1809
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
> KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
> KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
> KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
> KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
> KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
> KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
> KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
> KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
> KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
> KAFKA-1809_2015-03-16_09:02:18.patch
>
>
> The goal is to eventually support different security mechanisms on different 
> ports. 
> Currently brokers are defined as host+port pair, and this definition exists 
> throughout the code-base, therefore some refactoring is needed to support 
> multiple ports for a single broker.
> The detailed design is here: 
> https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28769: Patch for KAFKA-1809

2015-03-16 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28769/
---

(Updated March 16, 2015, 4:02 p.m.)


Review request for kafka.


Bugs: KAFKA-1809
https://issues.apache.org/jira/browse/KAFKA-1809


Repository: kafka


Description (updated)
---

squashing multi-broker-endpoint patches


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
PRE-CREATION 
  core/src/main/scala/kafka/cluster/BrokerEndPoint.scala PRE-CREATION 
  core/src/main/scala/kafka/cluster/EndPoint.scala PRE-CREATION 
  core/src/main/scala/kafka/cluster/SecurityProtocol.scala PRE-CREATION 
  core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/cluster/BrokerTest.scala PRE-CREATION 
  system_test/run_all.sh PRE-CREATION 
  system_test/run_all_replica.sh PRE-CREATION 

Diff: https://reviews.apache.org/r/28769/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-03-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363399#comment-14363399
 ] 

Gwen Shapira commented on KAFKA-1809:
-

Updated reviewboard https://reviews.apache.org/r/28769/diff/
 against branch trunk

> Refactor brokers to allow listening on multiple ports and IPs 
> --
>
> Key: KAFKA-1809
> URL: https://issues.apache.org/jira/browse/KAFKA-1809
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
> KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
> KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
> KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
> KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
> KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
> KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
> KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
> KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
> KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
> KAFKA-1809_2015-03-16_09:02:18.patch
>
>
> The goal is to eventually support different security mechanisms on different 
> ports. 
> Currently brokers are defined as host+port pair, and this definition exists 
> throughout the code-base, therefore some refactoring is needed to support 
> multiple ports for a single broker.
> The detailed design is here: 
> https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-03-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363429#comment-14363429
 ] 

Gwen Shapira commented on KAFKA-1809:
-

[~junrao]:

1. Addressed all sub-points
2. I think using actual version numbers in the config is more admin-friendly. I 
changed the parseConfig function to just use the 3 significant version numbers.
3. Fixed
4. good catch, that was silly :) fixed.
5. Fixed
6. I had to change this configurationin order to test end-to-end with a 
non-default protocol
7. Fixed
8. Yep. 
9. Merge bug. Fixed this.
10. TRACE is used for testing only, because it was important to make sure that 
things still work when I use the non-default protocol.
For example in SocketServerTest, but also I used it in manual testing.
11. Fixed
12. For performance reasons. Especially when validating the segments at the 
end. In current patch I changed it for all replica_testcases.
13. Ick! update metadatarequest was missing from the ser/de test suite! added 
it and validated that it catches the issue.
14. Ran system-tests (replica testcases only)
14.2 Ran with console producer and consumer. I'll open a separate JIRA for the 
rest of the tools, but I think it can go after we add the security protocol 
implementations.
14.3 Opened separate JIRA for this.



> Refactor brokers to allow listening on multiple ports and IPs 
> --
>
> Key: KAFKA-1809
> URL: https://issues.apache.org/jira/browse/KAFKA-1809
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
> KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
> KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
> KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
> KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
> KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
> KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
> KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
> KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
> KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
> KAFKA-1809_2015-03-16_09:02:18.patch
>
>
> The goal is to eventually support different security mechanisms on different 
> ports. 
> Currently brokers are defined as host+port pair, and this definition exists 
> throughout the code-base, therefore some refactoring is needed to support 
> multiple ports for a single broker.
> The detailed design is here: 
> https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-03-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1928:
-
Issue Type: Sub-task  (was: Improvement)
Parent: KAFKA-1682

> Move kafka.network over to using the network classes in 
> org.apache.kafka.common.network
> ---
>
> Key: KAFKA-1928
> URL: https://issues.apache.org/jira/browse/KAFKA-1928
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
>
> As part of the common package we introduced a bunch of network related code 
> and abstractions.
> We should look into replacing a lot of what is in kafka.network with this 
> code. Duplicate classes include things like Receive, Send, etc. It is likely 
> possible to also refactor the SocketServer to make use of Selector which 
> should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network

2015-03-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1928:
-
Component/s: security

> Move kafka.network over to using the network classes in 
> org.apache.kafka.common.network
> ---
>
> Key: KAFKA-1928
> URL: https://issues.apache.org/jira/browse/KAFKA-1928
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
>
> As part of the common package we introduced a bunch of network related code 
> and abstractions.
> We should look into replacing a lot of what is in kafka.network with this 
> code. Duplicate classes include things like Receive, Send, etc. It is likely 
> possible to also refactor the SocketServer to make use of Selector which 
> should significantly simplify it's code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Question about concurrency during Log config change

2015-03-16 Thread Andrii Biletskyi
Jay,

Thanks for quick response. Yes, this might be not that harmful for users,
I'm not sure about that. But it definitely looks like data race. Your
solution
is simple and should work, hard to tell promptly when it's about
concurrency.

Initially I was looking through this code to understand whether we can
inherit
this approach for Global Brokers Config. In this case your solution will be
harder
to implement since we access broker's config in many-many different places.
But that's another story.

Thanks,
Andrii Biletskyi

On Mon, Mar 16, 2015 at 5:56 PM, Jay Kreps  wrote:

> You are correct. Each read will be a valid value but there is no guarantee
> that subsequent reads will read from the same config. I don't think that is
> a problem, do you? If we want to strengthen the guarantee we can grab the
> config once in the method
>val config = log.config
> and then do however many accesses against that variable which will remain
> constant even if the config is updated in the course of the method.
>
> -Jay
>
> On Mon, Mar 16, 2015 at 8:50 AM, Andrii Biletskyi <
> andrii.bilets...@stealth.ly> wrote:
>
> > Hi all,
> >
> > I was looking through the code related to "dynamic Log config change"
> > feature and
> > noticed the way we deal with concurrency there. I have a question about
> it.
> >
> > The Log class holds volatile LogConfig property, almost all methods in
> > Log.scala are
> > synchronized on private lock object. But the code in TopicConfigManager
> > (
> >
> >
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108
> > )
> > which "substitutes" Log's logConfig is not synchronized.
> >
> > Code execution example:
> > Thread 1: Log.append -> Log:288 config.*maxMessageSize* is accessed
> >
> >
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288
> >
> > Thread 2: handles log config change -> TopicConfigManager:108 (see above)
> > substitutes
> > log's config - changes *maxMessageSize* and *segmentSize*
> >
> > Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and
> > pickups updated
> > config setting
> >
> >
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299
> >
> > So looks like we accessed object in partial "state" - in scope of one
> > procedure
> > (Log.append) we took one setting from the old state (maxMessageSize), and
> > the other
> > one from the updated state.
> >
> > Methods in Log are synchronized, as mentioned above. But logConfig is
> only
> > volatile
> > which solves visibility problems but doesn't prevent it from being
> changed
> > in other
> > thread, as I understand.
> >
> > Am I missing something here?
> >
> > Thanks,
> > Andrii Biletskyi
> >
>


Re: Review Request 28769: Patch for KAFKA-1809

2015-03-16 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28769/
---

(Updated March 16, 2015, 4:41 p.m.)


Review request for kafka.


Bugs: KAFKA-1809
https://issues.apache.org/jira/browse/KAFKA-1809


Repository: kafka


Description (updated)
---

forgot rest of patch


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
fa9daaef66ff7961e1c46cd0cd8fed18a53bccd8 
  clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
920b51a6c3c99639fbc9dc0656373c19fabd 
  clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
c899813d55b9c4786adde3d840f040d6645d27c8 
  config/server.properties 1614260b71a658b405bb24157c8f12b1f1031aa5 
  core/src/main/scala/kafka/admin/AdminUtils.scala 
b700110f2d7f1ede235af55d8e37e1b5592c6c7d 
  core/src/main/scala/kafka/admin/TopicCommand.scala 
f400b71f8444fffd3fc1d8398a283682390eba4e 
  core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
24aaf954dc42e2084454fa5fc9e8f388ea95c756 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
4ff7e8f8cc695551dd5d2fe65c74f6b6c571e340 
  core/src/main/scala/kafka/api/TopicMetadata.scala 
0190076df0adf906ecd332284f222ff974b315fc 
  core/src/main/scala/kafka/api/TopicMetadataResponse.scala 
92ac4e687be22e4800199c0666bfac5e0059e5bb 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
530982e36b17934b8cc5fb668075a5342e142c59 
  core/src/main/scala/kafka/client/ClientUtils.scala 
ebba87f0566684c796c26cb76c64b4640a5ccfde 
  core/src/main/scala/kafka/cluster/Broker.scala 
0060add008bb3bc4b0092f2173c469fce0120be6 
  core/src/main/scala/kafka/cluster/BrokerEndPoint.scala PRE-CREATION 
  core/src/main/scala/kafka/cluster/EndPoint.scala PRE-CREATION 
  core/src/main/scala/kafka/cluster/SecurityProtocol.scala PRE-CREATION 
  core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
9ebbee6c16dc83767297c729d2d74ebbd063a993 
  core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b9e2bea7b442a19bcebd1b350d39541a8c9dd068 
  core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
152fda5d1dcdf319399fdeeb8457006090ebe56c 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
cca815a128419e146feff53adaeddc901bb5de1f 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
c582191636f6188c25d62a67ff0315b56f163133 
  core/src/main/scala/kafka/controller/KafkaController.scala 
09fc46d759b74bcdad2d2a610d9c5a93ff02423f 
  core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala 
d281bb31a66fd749ecddfbe38479b6903f436831 
  core/src/main/scala/kafka/javaapi/TopicMetadata.scala 
f384e04678df10a5b46a439f475c63371bf8e32b 
  core/src/main/scala/kafka/network/RequestChannel.scala 
7b1db3dbbb2c0676f166890f566c14aa248467ab 
  core/src/main/scala/kafka/network/SocketServer.scala 
76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
  core/src/main/scala/kafka/producer/ProducerPool.scala 
43df70bb461dd3e385e6b20396adef3c4016a3fc 
  core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
20c00cb8cc2351950edbc8cb1752905a0c26e79f 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
e731df4b2a3e44aa3d761713a09b1070aff81430 
  core/src/main/scala/kafka/server/KafkaApis.scala 
35af98f0bc1b6a50bd1d97a30147593f8c6a422d 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
7907987e43404487382de7f4cc294f0d01ac15a7 
  core/src/main/scala/kafka/server/KafkaServer.scala 
dddef938fabae157ed8644536eb1a2f329fb42b7 
  core/src/main/scala/kafka/server/MetadataCache.scala 
6aef6e4508ecadbbcc1e12bed2054547b7aa333e 
  core/src/main/scala/kafka/server/ReplicaFetcherManager.scala 
351dbbad3bdb709937943b336a5b0a9e0162a5e2 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
d1e7c434e77859d746b8dc68dd5d5a3740425e79 
  core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
ba6ddd7a909df79a0f7d45e8b4a2af94ea0fceb6 
  core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 
b4f903b6c7c3bb725cac7c05eb1f885906413c4d 
  core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala 
111c9a8b94ce45d95551482e9fd3f8c1cccbf548 
  core/src/main/scala/kafka/utils/Utils.scala 
738c1af9ef5de16fdf5130daab69757a14c48b5c 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
7ae999ec619443d35a9cb8fbcd531fca0c51c8c0 
  core/sr

[jira] [Updated] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-03-16 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1809:

Attachment: KAFKA-1809_2015-03-16_09:40:49.patch

> Refactor brokers to allow listening on multiple ports and IPs 
> --
>
> Key: KAFKA-1809
> URL: https://issues.apache.org/jira/browse/KAFKA-1809
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
> KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
> KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
> KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
> KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
> KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
> KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
> KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
> KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
> KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
> KAFKA-1809_2015-03-16_09:02:18.patch, KAFKA-1809_2015-03-16_09:40:49.patch
>
>
> The goal is to eventually support different security mechanisms on different 
> ports. 
> Currently brokers are defined as host+port pair, and this definition exists 
> throughout the code-base, therefore some refactoring is needed to support 
> multiple ports for a single broker.
> The detailed design is here: 
> https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jiangjie Qin
Agree. Since throwing exception when close() is called in callback won’t
work because we are catching all the exceptions from callback, blocking
might be the only option we have here.

Jiangjie (Becket) Qin

On 3/15/15, 11:56 AM, "Jay Kreps"  wrote:

>Cool.
>
>I think blocking is good or alternately throwing an exception directly
>from
>close(). Basically I would just worry about subtly doing something
>slightly
>different from what the user asked for as it will be hard to notice that
>behavior difference.
>
>-Jay
>
>On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin 
>wrote:
>
>> Hi Jay,
>>
>> I have modified the KIP as you suggested. I thinks as long as we have
>> consistent define for timeout across Kafka interface, there would be no
>> problem. And I also agree it is better if we can make producer block
>>when
>> close() is called from sender thread so user will notice something went
>> wrong.
>>
>> Thanks.
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/14/15, 11:37 AM, "Jay Kreps"  wrote:
>>
>> >Hey Jiangjie,
>> >
>> >I think this is going to be very confusing that
>> >  close(0) waits indefinitely and
>> >  close(-1) waits for 0.
>> >I understand this appears in other apis, but it is a constant cause of
>> >bugs. Let's not repeat that mistake.
>> >
>> >Let's make close(0) wait for 0. We don't need a way to wait
>>indefinitely
>> >as
>> >we already have close() so having a magical constant for that is
>> >redundant.
>> >
>> >Calling close() from the I/O thread was already possible and would
>>block
>> >indefinitely. I think trying to silently change the behavior is
>>probably
>> >not right. I.e. if the user calls close() in the callback there is
>> >actually
>> >some misunderstanding and they need to think more, silently making this
>> >not
>> >block will hide the problem from them which is the opposite of what we
>> >want.
>> >
>> >-Jay
>> >
>> >On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin
>>
>> >wrote:
>> >
>> >> Hey Joe & Jay,
>> >>
>> >> Thanks for the comments on the voting thread. Since it seems we
>>probably
>> >> will have more discussion on this, I am just replying from the
>> >>discussion
>> >> thread here.
>> >> I’ve updated the KIP page to make it less like half-baked, apologize
>>for
>> >> the rush...
>> >>
>> >> The contract in current KIP is:
>> >>   1. close() - wait until all requests either are sent or reach
>>request
>> >> timeout.
>> >>   2. close(-1, TimeUnit.MILLISECONDS) - close immediately
>> >>   3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e.
>>Wait
>> >> until all requests are sent or reach request timeout
>> >>   4. close(5, TimeUnit.MILLISECONDS) - try the best to finish sending
>> >>in 5
>> >> milliseconds, if something went wrong, just shutdown the producer
>> >>anyway,
>> >> my callback will handle the failures.
>> >>
>> >> About how we define what timeout value stands for, I actually
>>struggled
>> >>a
>> >> little bit when wrote the patch. Intuitively, close(0) should mean
>> >> immediately, however it seems that all the existing java class have
>>this
>> >> convention of timeout=0 means no timeout or never timeout
>> >>(Thread.join(0),
>> >> Object.wait(0), etc.) So here the dilemma is either we follow the
>> >> intuition or we follow the convention. What I chose is to follow the
>> >> convention but document the interface to let user be aware of the
>>usage.
>> >> The reason is that I think producer.close() is a public interface so
>>it
>> >> might be better to follow java convention. Whereas selector is not a
>> >> public interface that used by user, so as long as it makes sense to
>>us,
>> >>it
>> >> is less a problem to be different from java convention. That said
>>since
>> >> consumer.poll(timeout) is also a public interface, I think it also
>>makes
>> >> sense to make producer.close() to have the same definition of
>> >> consumer.poll(timeout).
>> >>
>> >> The main argument for keeping a timeout in close would be separating
>>the
>> >> close timeout from request timeout, which probably makes sense. I
>>would
>> >> guess typically the request timeout would be long (e.g. 60 seconds)
>> >> because we might want to consider retries with back off time. If we
>>have
>> >> multiple batches in accumulator, in worst case that could take up to
>> >> several minutes to complete all the requests. But when we close a
>> >> producer, we might not want to wait for that long as it might cause
>>some
>> >> other problem like deployment tool timeout.
>> >>
>> >> There is also a subtle difference between close(timeout) and
>> >> flush(timeout). The only purpose for flush() is to write data to the
>> >> broker, so it makes perfect sense to wait until request timeout. I
>>think
>> >> that is why flush(timeout) looks strange. On the other hand, the top
>> >> priority for close() is to close the producer rather than flush()
>>data,
>> >>so
>> >> close(timeout) gives guarantee on bounded waiting for its main job.
>> >>
>> >> Sorry for the confusion about forceClose flag. It is n

[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-03-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363465#comment-14363465
 ] 

Gwen Shapira commented on KAFKA-1809:
-

Updated reviewboard https://reviews.apache.org/r/28769/diff/
 against branch trunk

> Refactor brokers to allow listening on multiple ports and IPs 
> --
>
> Key: KAFKA-1809
> URL: https://issues.apache.org/jira/browse/KAFKA-1809
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
> Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, 
> KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
> KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
> KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
> KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
> KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
> KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
> KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
> KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
> KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
> KAFKA-1809_2015-03-16_09:02:18.patch, KAFKA-1809_2015-03-16_09:40:49.patch
>
>
> The goal is to eventually support different security mechanisms on different 
> ports. 
> Currently brokers are defined as host+port pair, and this definition exists 
> throughout the code-base, therefore some refactoring is needed to support 
> multiple ports for a single broker.
> The detailed design is here: 
> https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jiangjie Qin
It seems there are two options we can choose from when close() is called
from sender thread (callback):
1. Log an error and close the producer using close(-1)
2. Log an error and block.
(Throwing an exception will not work because we catch all the exception
thrown from user callback. It will just lead to an error log.)

My concern for the first option is that the producer will be closed even
if we logged and error. I am wondering if some user would not even take a
look at the log if producer is closed normally. Because from the programs
behavior, everything looks good. If that is the case, the error message we
logged probably will just be ignored until some day when people check the
log and see it.

As for the second option, because producer does not close but blocks. User
will notice this the first time they run the program. They probably will
look at the log to see why producer could not be closed and they will see
the error log we put there. So they will get informed about this mis-usage
of close() in sender thread the first time they run the code instead of
some time later.

Personally I prefer the second one because it is more obvious that
something was wrong.

Jiangjie (Becket) Qin

On 3/15/15, 4:27 PM, "Guozhang Wang"  wrote:

>Yeah I agree we should not silently change the behavior of the function
>with the given parameters; and I would prefer error-logging-and-shutdown
>over blocking when close(>0) is used, since as Neha suggested blocking
>would also not proceed with sending any data, bu will just let users to
>realize the issue later than sooner.
>
>On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede  wrote:
>
>> >
>> > And I also agree it is better if we can make producer block when
>> > close() is called from sender thread so user will notice something
>>went
>> > wrong.
>>
>>
>> This isn't a great experience either. Why can't we just throw an
>>exception
>> for a behavior we know is incorrect and we'd like the user to know.
>> Blocking as a means of doing that seems wrong and annoying.
>>
>> On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps  wrote:
>>
>> > Cool.
>> >
>> > I think blocking is good or alternately throwing an exception directly
>> from
>> > close(). Basically I would just worry about subtly doing something
>> slightly
>> > different from what the user asked for as it will be hard to notice
>>that
>> > behavior difference.
>> >
>> > -Jay
>> >
>> > On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin
>>> >
>> > wrote:
>> >
>> > > Hi Jay,
>> > >
>> > > I have modified the KIP as you suggested. I thinks as long as we
>>have
>> > > consistent define for timeout across Kafka interface, there would
>>be no
>> > > problem. And I also agree it is better if we can make producer block
>> when
>> > > close() is called from sender thread so user will notice something
>>went
>> > > wrong.
>> > >
>> > > Thanks.
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On 3/14/15, 11:37 AM, "Jay Kreps"  wrote:
>> > >
>> > > >Hey Jiangjie,
>> > > >
>> > > >I think this is going to be very confusing that
>> > > >  close(0) waits indefinitely and
>> > > >  close(-1) waits for 0.
>> > > >I understand this appears in other apis, but it is a constant
>>cause of
>> > > >bugs. Let's not repeat that mistake.
>> > > >
>> > > >Let's make close(0) wait for 0. We don't need a way to wait
>> indefinitely
>> > > >as
>> > > >we already have close() so having a magical constant for that is
>> > > >redundant.
>> > > >
>> > > >Calling close() from the I/O thread was already possible and would
>> block
>> > > >indefinitely. I think trying to silently change the behavior is
>> probably
>> > > >not right. I.e. if the user calls close() in the callback there is
>> > > >actually
>> > > >some misunderstanding and they need to think more, silently making
>> this
>> > > >not
>> > > >block will hide the problem from them which is the opposite of
>>what we
>> > > >want.
>> > > >
>> > > >-Jay
>> > > >
>> > > >On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin
>> > > >
>> > > >wrote:
>> > > >
>> > > >> Hey Joe & Jay,
>> > > >>
>> > > >> Thanks for the comments on the voting thread. Since it seems we
>> > probably
>> > > >> will have more discussion on this, I am just replying from the
>> > > >>discussion
>> > > >> thread here.
>> > > >> I’ve updated the KIP page to make it less like half-baked,
>>apologize
>> > for
>> > > >> the rush...
>> > > >>
>> > > >> The contract in current KIP is:
>> > > >>   1. close() - wait until all requests either are sent or reach
>> > request
>> > > >> timeout.
>> > > >>   2. close(-1, TimeUnit.MILLISECONDS) - close immediately
>> > > >>   3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(),
>>i.e.
>> > Wait
>> > > >> until all requests are sent or reach request timeout
>> > > >>   4. close(5, TimeUnit.MILLISECONDS) - try the best to finish
>> sending
>> > > >>in 5
>> > > >> milliseconds, if something went wrong, just shutdown the producer
>> > > >>anyway,
>> > > >> my callback will handle the failures.
>> > > >>
>> >

Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Guozhang Wang
HI Jiangjie,

As far as I understand calling close() in the ioThread is not common, as it
may only trigger when we saw some non-retriable error. Hence when user run
their program it is unlikely that close() will be triggered and problem
will be detected. So it seems to me that from the error detection aspect
these two options seems to be the same as people will usually detect it
from the producer metrics all dropping to 0.

Guozhang

On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin 
wrote:

> It seems there are two options we can choose from when close() is called
> from sender thread (callback):
> 1. Log an error and close the producer using close(-1)
> 2. Log an error and block.
> (Throwing an exception will not work because we catch all the exception
> thrown from user callback. It will just lead to an error log.)
>
> My concern for the first option is that the producer will be closed even
> if we logged and error. I am wondering if some user would not even take a
> look at the log if producer is closed normally. Because from the programs
> behavior, everything looks good. If that is the case, the error message we
> logged probably will just be ignored until some day when people check the
> log and see it.
>
> As for the second option, because producer does not close but blocks. User
> will notice this the first time they run the program. They probably will
> look at the log to see why producer could not be closed and they will see
> the error log we put there. So they will get informed about this mis-usage
> of close() in sender thread the first time they run the code instead of
> some time later.
>
> Personally I prefer the second one because it is more obvious that
> something was wrong.
>
> Jiangjie (Becket) Qin
>
> On 3/15/15, 4:27 PM, "Guozhang Wang"  wrote:
>
> >Yeah I agree we should not silently change the behavior of the function
> >with the given parameters; and I would prefer error-logging-and-shutdown
> >over blocking when close(>0) is used, since as Neha suggested blocking
> >would also not proceed with sending any data, bu will just let users to
> >realize the issue later than sooner.
> >
> >On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede  wrote:
> >
> >> >
> >> > And I also agree it is better if we can make producer block when
> >> > close() is called from sender thread so user will notice something
> >>went
> >> > wrong.
> >>
> >>
> >> This isn't a great experience either. Why can't we just throw an
> >>exception
> >> for a behavior we know is incorrect and we'd like the user to know.
> >> Blocking as a means of doing that seems wrong and annoying.
> >>
> >> On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps 
> wrote:
> >>
> >> > Cool.
> >> >
> >> > I think blocking is good or alternately throwing an exception directly
> >> from
> >> > close(). Basically I would just worry about subtly doing something
> >> slightly
> >> > different from what the user asked for as it will be hard to notice
> >>that
> >> > behavior difference.
> >> >
> >> > -Jay
> >> >
> >> > On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin
> >> >> >
> >> > wrote:
> >> >
> >> > > Hi Jay,
> >> > >
> >> > > I have modified the KIP as you suggested. I thinks as long as we
> >>have
> >> > > consistent define for timeout across Kafka interface, there would
> >>be no
> >> > > problem. And I also agree it is better if we can make producer block
> >> when
> >> > > close() is called from sender thread so user will notice something
> >>went
> >> > > wrong.
> >> > >
> >> > > Thanks.
> >> > >
> >> > > Jiangjie (Becket) Qin
> >> > >
> >> > > On 3/14/15, 11:37 AM, "Jay Kreps"  wrote:
> >> > >
> >> > > >Hey Jiangjie,
> >> > > >
> >> > > >I think this is going to be very confusing that
> >> > > >  close(0) waits indefinitely and
> >> > > >  close(-1) waits for 0.
> >> > > >I understand this appears in other apis, but it is a constant
> >>cause of
> >> > > >bugs. Let's not repeat that mistake.
> >> > > >
> >> > > >Let's make close(0) wait for 0. We don't need a way to wait
> >> indefinitely
> >> > > >as
> >> > > >we already have close() so having a magical constant for that is
> >> > > >redundant.
> >> > > >
> >> > > >Calling close() from the I/O thread was already possible and would
> >> block
> >> > > >indefinitely. I think trying to silently change the behavior is
> >> probably
> >> > > >not right. I.e. if the user calls close() in the callback there is
> >> > > >actually
> >> > > >some misunderstanding and they need to think more, silently making
> >> this
> >> > > >not
> >> > > >block will hide the problem from them which is the opposite of
> >>what we
> >> > > >want.
> >> > > >
> >> > > >-Jay
> >> > > >
> >> > > >On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin
> >>  >> > >
> >> > > >wrote:
> >> > > >
> >> > > >> Hey Joe & Jay,
> >> > > >>
> >> > > >> Thanks for the comments on the voting thread. Since it seems we
> >> > probably
> >> > > >> will have more discussion on this, I am just replying from the
> >> > > >>discussion
> >> > > >> thread here.

Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/#review76566
---


Thanks for the patch. A few comments.


core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


Do we need to make end volatile since it's being updated in separate thread?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


Would it be better to rename this to sth like latencyToCompelete?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


Variable due doesn't seem to be used?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


I guess the sleep will be added when the actual rate exceeds the target 
rate? Would it be better to rename qtime as requestArrivalTime and interval as 
requestArrivalInterval?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


It would be useful to make the # of keys configurable.



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


So far, we haven't used this syntax for println. For consistency, perhaps 
it's better to use the existing way of string formatting.



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


Could we add some comments on the meaning of mu and sigma?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


Could we add some comments for the class? In particular, what does lamda 
mean?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


It would be helpful to provide a high level description of what kind of 
distribution we get in the samples. Also, is there a particular reason that we 
pick LogNormal distribution instead of just normal distribution?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


Could we add a bit of comment on how the sampling works? I guess it tries 
to spread the # requests into a 1000ms interval and returns the gap for the 
next request on every next() call?

Also, is there a particular reason that we want to choose exponential 
distribution to spread those requests instead of a simple uniform distribution 
(as done in ProducerPerformance)?



core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala


Is there a particular reason that we need to overwrite isCompleted()? 
Typically, only tryComplete() and onComplete() need to be overwritten in a 
subclass of DelayedOperation.

Actually, I am not sure how we complete the requests before the timeout is 
reached since there is no explict call for tryComplete()?


- Jun Rao


On March 10, 2015, 4:41 p.m., Yasuhiro Matsuda wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31893/
> ---
> 
> (Updated March 10, 2015, 4:41 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2013
> https://issues.apache.org/jira/browse/KAFKA-2013
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> purgatory micro benchmark
> 
> 
> Diffs
> -
> 
>   core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31893/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>



Re: Review Request 31967: Patch for KAFKA-1546

2015-03-16 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76571
---


Working on the remaining comments. Shall update the RB


core/src/main/scala/kafka/cluster/Replica.scala


It isn't required to set the lagBeginValue to the current time because the 
follower will make a fetch request during that time frame which may or may or 
may not read from the log end offset which will start the clock. But your 
suggestion makes it cleaner, so I'll change.


- Aditya Auradkar


On March 12, 2015, 8:42 p.m., Aditya Auradkar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> ---
> 
> (Updated March 12, 2015, 8:42 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
> https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time 
> since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the 
> LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala 
> bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
> 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>



Re: Review Request 31967: Patch for KAFKA-1546

2015-03-16 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76585
---



core/src/main/scala/kafka/cluster/Partition.scala


Interesting point. I thought that it would be enough to simply check the 
lag value. But yes, this will cause the HW to be inconsistent.


- Aditya Auradkar


On March 12, 2015, 8:42 p.m., Aditya Auradkar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> ---
> 
> (Updated March 12, 2015, 8:42 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
> https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time 
> since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the 
> LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala 
> bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
> 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>



Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-16 Thread Andrii Biletskyi
Jun,

Answering your questions:

101. If I understand you correctly, you are saying future producer versions
(which
will be ported to TMR_V1) won't be able to automatically create topic (if
we
unconditionally remove topic creation from there). But we need to this
preserve logic.
Ok, about your proposal: I'm not a big fan too, when it comes to
differentiating
clients directly in protocol schema. And also I'm not sure I understand at
all why
auto.create.topics.enable is a server side configuration. Can we deprecate
this setting
in future versions, add this setting to producer and based on that upon
receiving
UnknownTopic create topic explicitly by a separate producer call via
adminClient?

102.1. Hm, yes. It's because we want to support batching and at the same
time we
want to give descriptive error messages for clients. Since AdminClient
holds the context
to construct such messages (e.g. AdminClient layer can know that
InvalidArgumentsCode
means two cases: either invalid number - e.g. -1; or replication-factor was
provided while
partitions argument wasn't) - I wrapped responses in Exceptions. But I'm
open to any
other ideas, this was just initial version.
102.2. Yes, I agree. I'll change that to probably some other dto.

Thanks,
Andrii Biletskyi

On Fri, Mar 13, 2015 at 7:16 PM, Jun Rao  wrote:

> Andrii,
>
> 101. That's what I was thinking too, but it may not be that simple. In
> TopicMetadataRequest_V1,
> we can let it not trigger auto topic creation. Then, in the producer side,
> if it gets an UnknownTopicException, it can explicitly issue a
> createTopicRequest for auto topic creation. On the consumer side, it will
> never issue createTopicRequest. This works when auto topic creation is
> enabled on the broker side. However, I am not sure how things will work
> when auto topic creation is disabled on the broker side. In this case, we
> want to have a way to manually create a topic, potentially through admin
> commands. However, then we need a way to distinguish createTopicRequest
> issued from the producer clients and the admin tools. May be we can add a
> new field in createTopicRequest and set it differently in the producer
> client and the admin client. However, I am not sure if that's the best
> approach.
>
> 2. Yes, refactoring existing requests is a non-trivial amount of work. I
> posted some comments in KAFKA-1927. We will probably have to fix KAFKA-1927
> first, before adding the new logic in KAFKA-1694. Otherwise, the changes
> will be too big.
>
> 102. About the AdminClient:
> 102.1. It's a bit weird that we return exception in the api. It seems that
> we should either return error code or throw an exception when getting the
> response state.
> 102.2. We probably shouldn't explicitly use the request object in the api.
> Not every request evolution requires an api change.
>
> Thanks,
>
> Jun
>
>
> On Fri, Mar 13, 2015 at 4:08 AM, Andrii Biletskyi <
> andrii.bilets...@stealth.ly> wrote:
>
> > Jun,
> >
> > Thanks for you comments. Answers inline:
> >
> > 100. There are a few fields such as ReplicaAssignment,
> > > ReassignPartitionRequest,
> > > and PartitionsSerialized that are represented as a string, but contain
> > > composite structures in json. Could we flatten them out directly in the
> > > protocol definition as arrays/records?
> >
> >
> > Yes, now with Admin Client this looks a bit weird. My initial motivation
> > was:
> > ReassignPartitionCommand accepts input in json, we want to remain tools'
> > interfaces unchanged, where possible.
> > If we port it to deserialized format, in CLI (/tools project) we will
> have
> > to add some
> > json library since /tools is written in java and we'll need to
> deserialize
> > json file
> > provided by a user. Can we quickly agree on what this library should be
> > (Jackson, GSON, whatever)?
> >
> > 101. Does TopicMetadataRequest v1 still trigger auto topic creation? This
> > > will be a bit weird now that we have a separate topic creation api.
> Have
> > > you thought about how the new createTopicRequest and
> TopicMetadataRequest
> > > v1 will be used in the producer/consumer client, in addition to admin
> > > tools? For example, ideally, we don't want TopicMetadataRequest from
> the
> > > consumer to trigger auto topic creation.
> >
> >
> > I agree, this strange logic should be fixed. I'm not confident in this
> > Kafka part so
> > correct me if I'm wrong, but it doesn't look like a hard thing to do, I
> > think we can
> > leverage AdminClient for that in Producer and unconditionally remove
> topic
> > creation from the TopicMetadataRequest_V1.
> >
> > 2. I think Jay meant getting rid of scala classes
> > > like HeartbeatRequestAndHeader and HeartbeatResponseAndHeader. We did
> > that
> > > as a stop-gap thing when adding the new requests for the consumers.
> > > However, the long term plan is to get rid of all those and just reuse
> the
> > > java request/response in the client. Since this KIP proposes to add a
> > > significant number of new req

[jira] [Comment Edited] (KAFKA-2019) RoundRobinAssignor clusters by consumer

2015-03-16 Thread Joseph Holsten (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363625#comment-14363625
 ] 

Joseph Holsten edited comment on KAFKA-2019 at 3/16/15 6:09 PM:


[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*n) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn}} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*n) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*2) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?


was (Author: josephholsten):
[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*n) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*n) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*2) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?

> RoundRobinAssignor clusters by consumer
> ---
>
> Key: KAFKA-2019
> URL: https://issues.apache.org/jira/browse/KAFKA-2019
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Joseph Holsten
>Assignee: Neha Narkhede
>Priority: Minor
> Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, 
> KAFKA-2019.patch
>
>
> When rolling out a change today, I noticed that some of my consumers are 
> "greedy", taking far more partitions than others.
> The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds 
> sorted by toString, which is {{ "%s-%d".format(consumer, threadId)}}. This 
> causes each consumer's threads to be adjacent to each other.
> One possible fix would be to define ConsumerThreadId.hashCode, and sort by 
> that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2019) RoundRobinAssignor clusters by consumer

2015-03-16 Thread Joseph Holsten (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363625#comment-14363625
 ] 

Joseph Holsten commented on KAFKA-2019:
---

[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*n) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*n) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*2) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?

> RoundRobinAssignor clusters by consumer
> ---
>
> Key: KAFKA-2019
> URL: https://issues.apache.org/jira/browse/KAFKA-2019
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Joseph Holsten
>Assignee: Neha Narkhede
>Priority: Minor
> Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, 
> KAFKA-2019.patch
>
>
> When rolling out a change today, I noticed that some of my consumers are 
> "greedy", taking far more partitions than others.
> The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds 
> sorted by toString, which is {{ "%s-%d".format(consumer, threadId)}}. This 
> causes each consumer's threads to be adjacent to each other.
> One possible fix would be to define ConsumerThreadId.hashCode, and sort by 
> that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-2019) RoundRobinAssignor clusters by consumer

2015-03-16 Thread Joseph Holsten (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363625#comment-14363625
 ] 

Joseph Holsten edited comment on KAFKA-2019 at 3/16/15 6:10 PM:


[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*m) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn}} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*m) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*4) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?


was (Author: josephholsten):
[~becket_qin] could you provide an example of the worst case unbalance for this 
code? I'm having trouble seeing how the hash-ordered round robit can have a 
worst case more than number-of-consumer-threads. 

Here's my reasoning:

With the current consumer-ordered sort, the threadAssignor will iterate over 
the sequence:

- c0:t0
- c0:t1
- c0:t2
- c0:t3
- c1:t0
- c1:t1
- c1:t2
- c1:t3

With the hash-ordered sort, the sequence could have a best case of:

- c0:t0
- c1:t0
- c0:t1
- c1:t1
- c0:t2
- c1:t2
- c0:t3
- c1:t3

and a worst case identical to the consumer-ordered sort.

For a partition count of {{(n+1)*m}}, consumer-ordered will produce a 
worst-case ordering every time, with {{c0}} always ending up with {{2m}} 
partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will 
produce this sort of worst case:

Likelyhood that {{c0}} will have all m threads selected:

{code}
C(n*m,m) / P(n*m,n*n) =
((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood of any consumer {{c0..cn}} will have all its threads selected:

{code}
n * C(n*m,m) / P(n*m,n*n) =
n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!)
{code}

So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered 
with {{n=2}}, {{m=2}}:

{code}
2 *  C(2*2,2) / P(2*2,2*2) =
2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) =
2 * (6) / (24) =
0.5
{code}

And with {{n=2}}, {{m=4}}:

{code}
2 * C(2*4,4) / P(2*4,2*2) =
2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) =
2 * 70 / 40320 =
~ 0.00347
{code}

I totally agree to the point about backward incompatibility, it would not be 
pleasant to have different servers using different thread sequences. So this 
code can't go in as a modification to the existing RoundRobinAssignor, perhaps 
as a new assignor?

> RoundRobinAssignor clusters by consumer
> ---
>
> Key: KAFKA-2019
> URL: https://issues.apache.org/jira/browse/KAFKA-2019
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Joseph Holsten
>Assignee: Neha Narkhede
>Priority: Minor
> Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, 
> KAFKA-2019.patch
>
>
> When rolling out a change today, I noticed that some of my consumers are 
> "greedy", taking far more partitions than others.
> The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds 
> sorted by toString, which is {{ "%s-%d".format(consumer, threadId)}}. This 
> causes each consumer's threads to be adjacent to each other.
> One possible fix would be to define ConsumerThreadId.hashCode, and sort by 
> that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1546) Automate replica lag tuning

2015-03-16 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-1546:
-
Attachment: KAFKA-1546_2015-03-16_11:31:39.patch

> Automate replica lag tuning
> ---
>
> Key: KAFKA-1546
> URL: https://issues.apache.org/jira/browse/KAFKA-1546
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.0, 0.8.1, 0.8.1.1
>Reporter: Neha Narkhede
>Assignee: Aditya Auradkar
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1546.patch, KAFKA-1546_2015-03-11_18:48:09.patch, 
> KAFKA-1546_2015-03-12_13:42:01.patch, KAFKA-1546_2015-03-16_11:31:39.patch
>
>
> Currently, there is no good way to tune the replica lag configs to 
> automatically account for high and low volume topics on the same cluster. 
> For the low-volume topic it will take a very long time to detect a lagging
> replica, and for the high-volume topic it will have false-positives.
> One approach to making this easier would be to have the configuration
> be something like replica.lag.max.ms and translate this into a number
> of messages dynamically based on the throughput of the partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1546) Automate replica lag tuning

2015-03-16 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363672#comment-14363672
 ] 

Aditya A Auradkar commented on KAFKA-1546:
--

Updated reviewboard https://reviews.apache.org/r/31967/diff/
 against branch origin/trunk

> Automate replica lag tuning
> ---
>
> Key: KAFKA-1546
> URL: https://issues.apache.org/jira/browse/KAFKA-1546
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.0, 0.8.1, 0.8.1.1
>Reporter: Neha Narkhede
>Assignee: Aditya Auradkar
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1546.patch, KAFKA-1546_2015-03-11_18:48:09.patch, 
> KAFKA-1546_2015-03-12_13:42:01.patch, KAFKA-1546_2015-03-16_11:31:39.patch
>
>
> Currently, there is no good way to tune the replica lag configs to 
> automatically account for high and low volume topics on the same cluster. 
> For the low-volume topic it will take a very long time to detect a lagging
> replica, and for the high-volume topic it will have false-positives.
> One approach to making this easier would be to have the configuration
> be something like replica.lag.max.ms and translate this into a number
> of messages dynamically based on the throughput of the partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31967: Patch for KAFKA-1546

2015-03-16 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
---

(Updated March 16, 2015, 6:31 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
---

PATCH for KAFKA-1546


PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time 
since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the 
LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Diffs (updated)
-

  core/src/main/scala/kafka/cluster/Partition.scala 
c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala 
bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 
06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 
26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
---


Thanks,

Aditya Auradkar



Re: Review Request 31967: Patch for KAFKA-1546

2015-03-16 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
---

(Updated March 16, 2015, 6:32 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
---

PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time 
since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the 
LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Diffs
-

  core/src/main/scala/kafka/cluster/Partition.scala 
c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala 
bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 
06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 
26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
---


Thanks,

Aditya Auradkar



[jira] [Commented] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs

2015-03-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363678#comment-14363678
 ] 

Jun Rao commented on KAFKA-2020:


The following is the protocol for TopicMetadataResponse. Currently, we do the 
following:
1. If leader is not available, we set the partition level error code to 
LeaderNotAvailable.
2. If a non-leader replica is not available, we take that replica out of the 
the assigned replica list and isr in the response. As an indication for doing 
that, we set the partition level error code to ReplicaNotAvailable.

This has a few problems. First, ReplicaNotAvailable probably shouldn't be an 
error, at least for the normal producer/consumer clients that just want to find 
out the leader. Second, it can happen that both the leader and another replica 
are not available at the same time. There is no error code to indicate both. 
Third, even if a replica is not available, it's still useful to return its 
replica id since some clients (e.g. admin tool) may still make use of it.

One way to address this issue is to always return the replica id for leader, 
assigned replicas, and isr regardless of whether the corresponding broker is 
live or not. Since we also return the list of live brokers, the client can 
figure out whether a leader or a replica is live or not and act accordingly. 
This way, we don't need to set the partition level error code when the leader 
or a replica is not available. This doesn't change the wire protocol, but does 
change the semantics. So, a new version of the protocol is needed. Since we are 
debating evolving TopicMetadataRequest in KIP-4. We can potentially piggyback 
on that.

{code}
MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
NodeId => int32
Host => string
Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]
{code}

> I expect ReplicaNotAvailableException to have proper Javadocs
> -
>
> Key: KAFKA-2020
> URL: https://issues.apache.org/jira/browse/KAFKA-2020
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Chris Riccomini
>Assignee: Neha Narkhede
>
> It looks like ReplicaNotAvailableException was copy and pasted from 
> LeaderNotAvailable exception. The Javadocs were never changed. This means 
> that users think that ReplicaNotAvailableException signifies leaders are not 
> available. This is very different from, "I can ignore this exception," which 
> is what the Kafka protocol docs say to do with ReplicaNotAvailableException.
> Related: what's the point of ReplicaNotAvailableException if it's supposed to 
> be ignored?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31742: Patch for KAFKA-527

2015-03-16 Thread Yasuhiro Matsuda


> On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/message/MessageWriter.scala, line 29
> > 
> >
> > Add a check that codec should not be NoCompression.

Why the codec should not be NoCompression? The code works with NoCompression, 
too.


> On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/message/MessageWriter.scala, line 97
> > 
> >
> > Could we use comments in 
> > 
> > /**
> >  *
> >  */
> >  
> > format?

Is this comment style prohibitted? This class is for internal use with fairly 
localized usage.


> On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/message/MessageWriter.scala, line 117
> > 
> >
> > We can just pass in the Byte here.

This is a contract of OutputStream.


> On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/message/MessageWriter.scala, line 135
> > 
> >
> > Better group the private functions together after the public functions.

Well, I don't think it is particulary better way to organize code, but if you 
insist I can change it.
Kafka code base doesn't seem to follow that convention...


On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote:
> > The inheritance of MessageWriter from BufferingOutputStream is a bit 
> > confusing, since it will always use itself in the writePayload function 
> > parameter. 
> > 
> > I feel it is more clear to read the code if we just let MessageWriter 
> > contains a var of BufferingOutputStream; and instead of pass in the 
> > function logic of writing the message, we can just pass in messages and 
> > offsetCounter in the write() call which will then write the messages itself.

It is true that the current code writes only through writePayload. But I wanted 
MessageWriter to be a subclass of OutputStream to be more generic in case we 
need to write additional inforation other than messages in future.


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/#review76454
---


On March 4, 2015, 7:43 p.m., Yasuhiro Matsuda wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31742/
> ---
> 
> (Updated March 4, 2015, 7:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-527
> https://issues.apache.org/jira/browse/KAFKA-527
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> less byte copies
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> 9c694719dc9b515fb3c3ae96435a87b334044272 
>   core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31742/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>



Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jiangjie Qin
It looks that the problem we want to solve and the purpose we want to
achieve is:
If user uses close() in callback, we want to let user be aware that they
should use close(0) instead of close() in the callback.

We have agreed that we will have an error log to inform user about this
mis-usage. The options differ in the way how we can force user to take a
look at that error log.
There are two scenarios:
1. User does not expect the program to exit.
2. User expect the program to exit.

For scenario 1), blocking will probably delay the discovery of the
problem. Calling close(0) exposes the problem quicker. In this scenario
producer just encounter a send failure when running normally.
For scenario 2), blocking will expose the problem quick. Calling close(-1)
might hide the problem. This scenario might include: a) Unit test for a
send failure. b) Message sending during a close() call from a user thread.

So as a summary table:

  Scenario 1) Scenario 2)

Blocking  Delay problem discovery Guaranteed problem discovery

Close(-1) Immediate problem discovery Problem might be hidden


Personally I prefer blocking because it seems providing more guarantees
and safer.

Thanks.

Jiangjie (Becket) Qin


On 3/16/15, 10:11 AM, "Guozhang Wang"  wrote:

>HI Jiangjie,
>
>As far as I understand calling close() in the ioThread is not common, as
>it
>may only trigger when we saw some non-retriable error. Hence when user run
>their program it is unlikely that close() will be triggered and problem
>will be detected. So it seems to me that from the error detection aspect
>these two options seems to be the same as people will usually detect it
>from the producer metrics all dropping to 0.
>
>Guozhang
>
>On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin 
>wrote:
>
>> It seems there are two options we can choose from when close() is called
>> from sender thread (callback):
>> 1. Log an error and close the producer using close(-1)
>> 2. Log an error and block.
>> (Throwing an exception will not work because we catch all the exception
>> thrown from user callback. It will just lead to an error log.)
>>
>> My concern for the first option is that the producer will be closed even
>> if we logged and error. I am wondering if some user would not even take
>>a
>> look at the log if producer is closed normally. Because from the
>>programs
>> behavior, everything looks good. If that is the case, the error message
>>we
>> logged probably will just be ignored until some day when people check
>>the
>> log and see it.
>>
>> As for the second option, because producer does not close but blocks.
>>User
>> will notice this the first time they run the program. They probably will
>> look at the log to see why producer could not be closed and they will
>>see
>> the error log we put there. So they will get informed about this
>>mis-usage
>> of close() in sender thread the first time they run the code instead of
>> some time later.
>>
>> Personally I prefer the second one because it is more obvious that
>> something was wrong.
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/15/15, 4:27 PM, "Guozhang Wang"  wrote:
>>
>> >Yeah I agree we should not silently change the behavior of the function
>> >with the given parameters; and I would prefer
>>error-logging-and-shutdown
>> >over blocking when close(>0) is used, since as Neha suggested blocking
>> >would also not proceed with sending any data, bu will just let users to
>> >realize the issue later than sooner.
>> >
>> >On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede 
>>wrote:
>> >
>> >> >
>> >> > And I also agree it is better if we can make producer block when
>> >> > close() is called from sender thread so user will notice something
>> >>went
>> >> > wrong.
>> >>
>> >>
>> >> This isn't a great experience either. Why can't we just throw an
>> >>exception
>> >> for a behavior we know is incorrect and we'd like the user to know.
>> >> Blocking as a means of doing that seems wrong and annoying.
>> >>
>> >> On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps 
>> wrote:
>> >>
>> >> > Cool.
>> >> >
>> >> > I think blocking is good or alternately throwing an exception
>>directly
>> >> from
>> >> > close(). Basically I would just worry about subtly doing something
>> >> slightly
>> >> > different from what the user asked for as it will be hard to notice
>> >>that
>> >> > behavior difference.
>> >> >
>> >> > -Jay
>> >> >
>> >> > On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin
>> >>> >> >
>> >> > wrote:
>> >> >
>> >> > > Hi Jay,
>> >> > >
>> >> > > I have modified the KIP as you suggested. I thinks as long as we
>> >>have
>> >> > > consistent define for timeout across Kafka interface, there would
>> >>be no
>> >> > > problem. And I also agree it is better if we can make producer
>>block
>> >> when
>> >> > > close() is called from sender thread so user will notice
>>something
>> >>went
>> >> > > wrong.
>> >> > >
>> >> > > Thanks.
>> >> > >
>> >> > > Jiangjie (Becket) Qin
>> >> > >
>> >>

Re: Review Request 32061: WIP for KAFKA-2015 plus some minor fixes in new consumer

2015-03-16 Thread Onur Karaman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32061/#review76630
---

Ship it!


Ship It!

- Onur Karaman


On March 13, 2015, 10:26 p.m., Guozhang Wang wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32061/
> ---
> 
> (Updated March 13, 2015, 10:26 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2015
> https://issues.apache.org/jira/browse/KAFKA-2015
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> NOTE: without the rebalance implementation a single consumer will try to 
> subscribe to all partitions of the given topic.
> 
> 
> Diffs
> -
> 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  436f9b2a843bc8c44d17403f5880b6736a5d56a8 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  8b71fbad5c404d3f23137e153d6376de9f82b823 
>   config/tools-log4j.properties 52f07c96019b4083fc78f62cfb0a81080327e847 
>   core/src/main/scala/kafka/consumer/BaseConsumer.scala PRE-CREATION 
>   core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
> 910691e88ccc66a1542d0ea85bb2f732861d805e 
>   core/src/main/scala/kafka/tools/ConsoleProducer.scala 
> 00265f9f4a4b6c6a9aa023e5be5faf297f77bf31 
> 
> Diff: https://reviews.apache.org/r/32061/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>



[jira] [Created] (KAFKA-2022) simpleconsumer.fetch(req) throws a java.nio.channels.ClosedChannelException: null exception when the original leader fails instead of being trapped in the fetchResponse a

2015-03-16 Thread Muqeet Mohammed Ali (JIRA)
Muqeet Mohammed Ali created KAFKA-2022:
--

 Summary: simpleconsumer.fetch(req) throws a 
java.nio.channels.ClosedChannelException: null exception when the original 
leader fails instead of being trapped in the fetchResponse api while consuming 
messages
 Key: KAFKA-2022
 URL: https://issues.apache.org/jira/browse/KAFKA-2022
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.1
 Environment: 3 linux nodes with both zookeepr & brokers running under 
respective users on each..
Reporter: Muqeet Mohammed Ali
Assignee: Neha Narkhede


simpleconsumer.fetch(req) throws a java.nio.channels.ClosedChannelException: 
null exception when the original leader fails, instead of being trapped in the 
fetchResponse api while consuming messages. My understanding was that any fetch 
failures can be found via fetchResponse.hasError() call and then be handled to 
fetch new leader in this case. Below is the relevant code snippet from the 
simple consumer with comments marking the line causing exception..can you 
please comment on this?

if (simpleconsumer == null) {
simpleconsumer = new 
SimpleConsumer(leaderAddress.getHostName(), leaderAddress.getPort(), 
consumerTimeout,
consumerBufferSize, 
consumerId);
}

FetchRequest req = new FetchRequestBuilder().clientId(getConsumerId())
.addFetch(topic, partition, 
offsetManager.getTempOffset(), consumerBufferSize)
// Note: the fetchSize might need to be increased
// if large batches are written to Kafka
.build();
// exception is throw at the below line
FetchResponse fetchResponse = simpleconsumer.fetch(req);

if (fetchResponse.hasError()) {
numErrors++;
etc...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Guozhang Wang
Thanks Jiangjie,

After talking to you offline on this, I have been convinced and changed my
preference to blocking. The immediate shutdown approach does have some
unsafeness in some cases.

Guozhang

On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin 
wrote:

> It looks that the problem we want to solve and the purpose we want to
> achieve is:
> If user uses close() in callback, we want to let user be aware that they
> should use close(0) instead of close() in the callback.
>
> We have agreed that we will have an error log to inform user about this
> mis-usage. The options differ in the way how we can force user to take a
> look at that error log.
> There are two scenarios:
> 1. User does not expect the program to exit.
> 2. User expect the program to exit.
>
> For scenario 1), blocking will probably delay the discovery of the
> problem. Calling close(0) exposes the problem quicker. In this scenario
> producer just encounter a send failure when running normally.
> For scenario 2), blocking will expose the problem quick. Calling close(-1)
> might hide the problem. This scenario might include: a) Unit test for a
> send failure. b) Message sending during a close() call from a user thread.
>
> So as a summary table:
>
>   Scenario 1) Scenario 2)
>
> Blocking  Delay problem discovery Guaranteed problem discovery
>
> Close(-1) Immediate problem discovery Problem might be hidden
>
>
> Personally I prefer blocking because it seems providing more guarantees
> and safer.
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
>
> On 3/16/15, 10:11 AM, "Guozhang Wang"  wrote:
>
> >HI Jiangjie,
> >
> >As far as I understand calling close() in the ioThread is not common, as
> >it
> >may only trigger when we saw some non-retriable error. Hence when user run
> >their program it is unlikely that close() will be triggered and problem
> >will be detected. So it seems to me that from the error detection aspect
> >these two options seems to be the same as people will usually detect it
> >from the producer metrics all dropping to 0.
> >
> >Guozhang
> >
> >On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin 
> >wrote:
> >
> >> It seems there are two options we can choose from when close() is called
> >> from sender thread (callback):
> >> 1. Log an error and close the producer using close(-1)
> >> 2. Log an error and block.
> >> (Throwing an exception will not work because we catch all the exception
> >> thrown from user callback. It will just lead to an error log.)
> >>
> >> My concern for the first option is that the producer will be closed even
> >> if we logged and error. I am wondering if some user would not even take
> >>a
> >> look at the log if producer is closed normally. Because from the
> >>programs
> >> behavior, everything looks good. If that is the case, the error message
> >>we
> >> logged probably will just be ignored until some day when people check
> >>the
> >> log and see it.
> >>
> >> As for the second option, because producer does not close but blocks.
> >>User
> >> will notice this the first time they run the program. They probably will
> >> look at the log to see why producer could not be closed and they will
> >>see
> >> the error log we put there. So they will get informed about this
> >>mis-usage
> >> of close() in sender thread the first time they run the code instead of
> >> some time later.
> >>
> >> Personally I prefer the second one because it is more obvious that
> >> something was wrong.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 3/15/15, 4:27 PM, "Guozhang Wang"  wrote:
> >>
> >> >Yeah I agree we should not silently change the behavior of the function
> >> >with the given parameters; and I would prefer
> >>error-logging-and-shutdown
> >> >over blocking when close(>0) is used, since as Neha suggested blocking
> >> >would also not proceed with sending any data, bu will just let users to
> >> >realize the issue later than sooner.
> >> >
> >> >On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede 
> >>wrote:
> >> >
> >> >> >
> >> >> > And I also agree it is better if we can make producer block when
> >> >> > close() is called from sender thread so user will notice something
> >> >>went
> >> >> > wrong.
> >> >>
> >> >>
> >> >> This isn't a great experience either. Why can't we just throw an
> >> >>exception
> >> >> for a behavior we know is incorrect and we'd like the user to know.
> >> >> Blocking as a means of doing that seems wrong and annoying.
> >> >>
> >> >> On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps 
> >> wrote:
> >> >>
> >> >> > Cool.
> >> >> >
> >> >> > I think blocking is good or alternately throwing an exception
> >>directly
> >> >> from
> >> >> > close(). Basically I would just worry about subtly doing something
> >> >> slightly
> >> >> > different from what the user asked for as it will be hard to notice
> >> >>that
> >> >> > behavior difference.
> >> >> >
> >> >> > -Jay
> >> >> >
> >> >> > On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin
> >> >> >> >> >
> >>

Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Yasuhiro Matsuda


> On March 16, 2015, 5:17 p.m., Jun Rao wrote:
> > core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala, line 193
> > 
> >
> > Is there a particular reason that we need to overwrite isCompleted()? 
> > Typically, only tryComplete() and onComplete() need to be overwritten in a 
> > subclass of DelayedOperation.
> > 
> > Actually, I am not sure how we complete the requests before the timeout 
> > is reached since there is no explict call for tryComplete()?

isCompleted checks if the current time has passed the schedule completion time 
rather than if forceComplete has been called. It makes isCompleted always 
accurate.

Purgatory checks watcher lists every so often and calls isCompleted. Calling 
forceComplete from isCompeleted ensures that a completed request is removed 
from the timing wheels in the new implementation. In terms of timing, this is 
not very accurate because completed requests may stay longer then they should 
be. This doesn't affect the old implementaion at all, but it may impose some 
overheads on the new implementaion. Still, the new one outperforms the old one.

It is ideal if we can call call forceComplete on scheduled completion time. It 
requires another timer (DelayQueue or Timer) for that. I think it is too much 
overhead to measure purgatory performace. And also it is hard to guarantee such 
a timer works accurately in this test setting.


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/#review76566
---


On March 16, 2015, 8:23 p.m., Yasuhiro Matsuda wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31893/
> ---
> 
> (Updated March 16, 2015, 8:23 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2013
> https://issues.apache.org/jira/browse/KAFKA-2013
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> purgatory micro benchmark
> 
> 
> Diffs
> -
> 
>   core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31893/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>



Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Yasuhiro Matsuda

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/
---

(Updated March 16, 2015, 8:23 p.m.)


Review request for kafka.


Bugs: KAFKA-2013
https://issues.apache.org/jira/browse/KAFKA-2013


Repository: kafka


Description
---

purgatory micro benchmark


Diffs (updated)
-

  core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31893/diff/


Testing
---


Thanks,

Yasuhiro Matsuda



[jira] [Commented] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363843#comment-14363843
 ] 

Yasuhiro Matsuda commented on KAFKA-2013:
-

Updated reviewboard https://reviews.apache.org/r/31893/diff/
 against branch origin/trunk

> benchmark test for the purgatory
> 
>
> Key: KAFKA-2013
> URL: https://issues.apache.org/jira/browse/KAFKA-2013
> Project: Kafka
>  Issue Type: Test
>  Components: purgatory
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
>Priority: Trivial
> Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch
>
>
> We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yasuhiro Matsuda updated KAFKA-2013:

Attachment: KAFKA-2013_2015-03-16_13:23:38.patch

> benchmark test for the purgatory
> 
>
> Key: KAFKA-2013
> URL: https://issues.apache.org/jira/browse/KAFKA-2013
> Project: Kafka
>  Issue Type: Test
>  Components: purgatory
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
>Priority: Trivial
> Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch
>
>
> We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31742: Patch for KAFKA-527

2015-03-16 Thread Guozhang Wang


> On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/message/MessageWriter.scala, line 29
> > 
> >
> > Add a check that codec should not be NoCompression.
> 
> Yasuhiro Matsuda wrote:
> Why the codec should not be NoCompression? The code works with 
> NoCompression, too.

That's right, it works with NoCompression too. My Bad.


> On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/message/MessageWriter.scala, line 97
> > 
> >
> > Could we use comments in 
> > 
> > /**
> >  *
> >  */
> >  
> > format?
> 
> Yasuhiro Matsuda wrote:
> Is this comment style prohibitted? This class is for internal use with 
> fairly localized usage.

We used to maintain a coding guidence 
(http://kafka.apache.org/coding-guide.html), but I think we did not do a great 
job enforcing it and the page itself is also a bit out dated. Jay added the 
checkstyle package in order to improve on this aspect, but that do not have 
comments rules in it. I thought the common rules in the code are:

1. Use /* */ for class definitions and user-facing API comments.
2. Use // for in-function comments (no-capitalization).

But it is somehow not programmatically enforced. Anyways, let me know if you 
think that is too picky and we can try be more flexible in terms of commenting.


> On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/message/MessageWriter.scala, line 117
> > 
> >
> > We can just pass in the Byte here.
> 
> Yasuhiro Matsuda wrote:
> This is a contract of OutputStream.

Cool. Could we add the optional "override" here?


> On March 13, 2015, 11:43 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/message/MessageWriter.scala, line 135
> > 
> >
> > Better group the private functions together after the public functions.
> 
> Yasuhiro Matsuda wrote:
> Well, I don't think it is particulary better way to organize code, but if 
> you insist I can change it.
> Kafka code base doesn't seem to follow that convention...

Again, we did not do a good job enforcing any sort of such coding style, and it 
maybe just myself being unreasonable about these rules. I am open to other 
reviewers taking a look and giving his / her thoughts.


On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote:
> > The inheritance of MessageWriter from BufferingOutputStream is a bit 
> > confusing, since it will always use itself in the writePayload function 
> > parameter. 
> > 
> > I feel it is more clear to read the code if we just let MessageWriter 
> > contains a var of BufferingOutputStream; and instead of pass in the 
> > function logic of writing the message, we can just pass in messages and 
> > offsetCounter in the write() call which will then write the messages itself.
> 
> Yasuhiro Matsuda wrote:
> It is true that the current code writes only through writePayload. But I 
> wanted MessageWriter to be a subclass of OutputStream to be more generic in 
> case we need to write additional inforation other than messages in future.

As for now MessageWriter's only public function is write(key, codec) 
(valueWritefunction), which is used for writing a single message. Also its 
private functions withCrc32Prefix / withLengthPrefix is only used for message 
writing. So it is a bit unclear about your motivation in future extensions. 
Could you elaborate a bit more on that?


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/#review76454
---


On March 4, 2015, 7:43 p.m., Yasuhiro Matsuda wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31742/
> ---
> 
> (Updated March 4, 2015, 7:43 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-527
> https://issues.apache.org/jira/browse/KAFKA-527
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> less byte copies
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> 9c694719dc9b515fb3c3ae96435a87b334044272 
>   core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31742/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>



Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-16 Thread Jun Rao
101. There may be a use case where you only want the topics to be created
manually by admins. Currently, you can do that by disabling auto topic
creation and issue topic creation from the TopicCommand. If we disable auto
topic creation completely on the broker and don't have a way to distinguish
between topic creation requests from the regular clients and the admin, we
can't support manual topic creation any more. I was thinking that another
way of distinguishing the clients making the topic creation requests is
using clientId. For example, the admin tool can set it to something like
admin and the broker can treat that clientId specially.

Also, there is a related discussion in KAFKA-2020. Currently, we do the
following in TopicMetadataResponse:

1. If leader is not available, we set the partition level error code to
LeaderNotAvailable.
2. If a non-leader replica is not available, we take that replica out of
the assigned replica list and isr in the response. As an indication for
doing that, we set the partition level error code to ReplicaNotAvailable.

This has a few problems. First, ReplicaNotAvailable probably shouldn't be
an error, at least for the normal producer/consumer clients that just want
to find out the leader. Second, it can happen that both the leader and
another replica are not available at the same time. There is no error code
to indicate both. Third, even if a replica is not available, it's still
useful to return its replica id since some clients (e.g. admin tool) may
still make use of it.

One way to address this issue is to always return the replica id for
leader, assigned replicas, and isr regardless of whether the corresponding
broker is live or not. Since we also return the list of live brokers, the
client can figure out whether a leader or a replica is live or not and act
accordingly. This way, we don't need to set the partition level error code
when the leader or a replica is not available. This doesn't change the wire
protocol, but does change the semantics. Since we are evolving the protocol
of TopicMetadataRequest here, we can potentially piggyback the change.

102.1 For those types of errors due to invalid input, shouldn't we just
guard it at parameter validation time and throw InvalidArgumentException
without even sending the request to the broker?

Thanks,

Jun


On Mon, Mar 16, 2015 at 10:37 AM, Andrii Biletskyi <
andrii.bilets...@stealth.ly> wrote:

> Jun,
>
> Answering your questions:
>
> 101. If I understand you correctly, you are saying future producer versions
> (which
> will be ported to TMR_V1) won't be able to automatically create topic (if
> we
> unconditionally remove topic creation from there). But we need to this
> preserve logic.
> Ok, about your proposal: I'm not a big fan too, when it comes to
> differentiating
> clients directly in protocol schema. And also I'm not sure I understand at
> all why
> auto.create.topics.enable is a server side configuration. Can we deprecate
> this setting
> in future versions, add this setting to producer and based on that upon
> receiving
> UnknownTopic create topic explicitly by a separate producer call via
> adminClient?
>
> 102.1. Hm, yes. It's because we want to support batching and at the same
> time we
> want to give descriptive error messages for clients. Since AdminClient
> holds the context
> to construct such messages (e.g. AdminClient layer can know that
> InvalidArgumentsCode
> means two cases: either invalid number - e.g. -1; or replication-factor was
> provided while
> partitions argument wasn't) - I wrapped responses in Exceptions. But I'm
> open to any
> other ideas, this was just initial version.
> 102.2. Yes, I agree. I'll change that to probably some other dto.
>
> Thanks,
> Andrii Biletskyi
>
> On Fri, Mar 13, 2015 at 7:16 PM, Jun Rao  wrote:
>
> > Andrii,
> >
> > 101. That's what I was thinking too, but it may not be that simple. In
> > TopicMetadataRequest_V1,
> > we can let it not trigger auto topic creation. Then, in the producer
> side,
> > if it gets an UnknownTopicException, it can explicitly issue a
> > createTopicRequest for auto topic creation. On the consumer side, it will
> > never issue createTopicRequest. This works when auto topic creation is
> > enabled on the broker side. However, I am not sure how things will work
> > when auto topic creation is disabled on the broker side. In this case, we
> > want to have a way to manually create a topic, potentially through admin
> > commands. However, then we need a way to distinguish createTopicRequest
> > issued from the producer clients and the admin tools. May be we can add a
> > new field in createTopicRequest and set it differently in the producer
> > client and the admin client. However, I am not sure if that's the best
> > approach.
> >
> > 2. Yes, refactoring existing requests is a non-trivial amount of work. I
> > posted some comments in KAFKA-1927. We will probably have to fix
> KAFKA-1927
> > first, before adding the new logic in KAFKA-1694. O

Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Yasuhiro Matsuda

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/
---

(Updated March 16, 2015, 9:13 p.m.)


Review request for kafka.


Bugs: KAFKA-2013
https://issues.apache.org/jira/browse/KAFKA-2013


Repository: kafka


Description
---

purgatory micro benchmark


Diffs (updated)
-

  core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31893/diff/


Testing
---


Thanks,

Yasuhiro Matsuda



[jira] [Commented] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14363957#comment-14363957
 ] 

Yasuhiro Matsuda commented on KAFKA-2013:
-

Updated reviewboard https://reviews.apache.org/r/31893/diff/
 against branch origin/trunk

> benchmark test for the purgatory
> 
>
> Key: KAFKA-2013
> URL: https://issues.apache.org/jira/browse/KAFKA-2013
> Project: Kafka
>  Issue Type: Test
>  Components: purgatory
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
>Priority: Trivial
> Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, 
> KAFKA-2013_2015-03-16_14:13:20.patch
>
>
> We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yasuhiro Matsuda updated KAFKA-2013:

Attachment: KAFKA-2013_2015-03-16_14:13:20.patch

> benchmark test for the purgatory
> 
>
> Key: KAFKA-2013
> URL: https://issues.apache.org/jira/browse/KAFKA-2013
> Project: Kafka
>  Issue Type: Test
>  Components: purgatory
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
>Priority: Trivial
> Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, 
> KAFKA-2013_2015-03-16_14:13:20.patch
>
>
> We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31958: Patch for KAFKA-1684

2015-03-16 Thread Michael Herstine

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31958/#review76640
---



core/src/main/scala/kafka/network/SocketServer.scala


`{want,needs}ClientAuth` can be tricky-- check the javadoc for 
`SSLEngine.setWantClientAuth`... there are actually only three states: 
required, requested, not desired, and the last call to `{want,needs}ClientAuth` 
"wins".

So, if "needs" is True and "wants" is false, invoking the methods in this 
order will actually overwrite the "needs" setting. Recommend something like:

if (sslConnectionConfig.needClientAuth) {
sslEngine.setNeedClientAuth(true);
} else {
sslEngine.setNeedClientAuth(false);
sslEngine.setWantClientAuth(sslConnectionConfig.wantClientAuth);
}



core/src/main/scala/kafka/network/ssl/SSLChannel.scala


Suppose SSLEngine has written the current message (via `wrap`) to 
`netOutBuffer`, but that the write call in `flush`, when invoked from 
`handshakeWrap`, didn't write the entire buffer to the underlying socket.

Would not `handshakeStatus` as reported from SSLEngine now be 
`NEEDS_UNWRAP`? And wouldn't that cause us to fall through to the 
`NEEDS_UNWRAP` case?

Or do we not fall through in Scala case statements?



core/src/main/scala/kafka/network/ssl/SSLChannel.scala


Not sure about this, but do we want to update the position & limit of the 
buffer? We flipped it after the last read, but I can't rememeber if 
SSLEngine.unwrap will update them if there's an incomplete packet (i.e. in the 
BUFFER_UNDERFLOW case).


Just a few questions on some corner cases... handling all the possibilities 
when handshaking over NIO is really tough.

- Michael Herstine


On March 11, 2015, 9:36 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31958/
> ---
> 
> (Updated March 11, 2015, 9:36 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1684
> https://issues.apache.org/jira/browse/KAFKA-1684
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1684. Implement TLS/SSL authentication.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/network/Channel.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
>   core/src/main/scala/kafka/network/ssl/SSLChannel.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> dddef938fabae157ed8644536eb1a2f329fb42b7 
>   core/src/main/scala/kafka/utils/SSLAuthUtils.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
> 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/utils/TestSSLUtils.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31958/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jiangjie Qin
Thanks Guozhang. It wouldn’t be as thoroughly considered without
discussing with you :)

Jiangjie (Becket) Qin

On 3/16/15, 1:07 PM, "Guozhang Wang"  wrote:

>Thanks Jiangjie,
>
>After talking to you offline on this, I have been convinced and changed my
>preference to blocking. The immediate shutdown approach does have some
>unsafeness in some cases.
>
>Guozhang
>
>On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin 
>wrote:
>
>> It looks that the problem we want to solve and the purpose we want to
>> achieve is:
>> If user uses close() in callback, we want to let user be aware that they
>> should use close(0) instead of close() in the callback.
>>
>> We have agreed that we will have an error log to inform user about this
>> mis-usage. The options differ in the way how we can force user to take a
>> look at that error log.
>> There are two scenarios:
>> 1. User does not expect the program to exit.
>> 2. User expect the program to exit.
>>
>> For scenario 1), blocking will probably delay the discovery of the
>> problem. Calling close(0) exposes the problem quicker. In this scenario
>> producer just encounter a send failure when running normally.
>> For scenario 2), blocking will expose the problem quick. Calling
>>close(-1)
>> might hide the problem. This scenario might include: a) Unit test for a
>> send failure. b) Message sending during a close() call from a user
>>thread.
>>
>> So as a summary table:
>>
>>   Scenario 1) Scenario 2)
>>
>> Blocking  Delay problem discovery Guaranteed problem
>>discovery
>>
>> Close(-1) Immediate problem discovery Problem might be hidden
>>
>>
>> Personally I prefer blocking because it seems providing more guarantees
>> and safer.
>>
>> Thanks.
>>
>> Jiangjie (Becket) Qin
>>
>>
>> On 3/16/15, 10:11 AM, "Guozhang Wang"  wrote:
>>
>> >HI Jiangjie,
>> >
>> >As far as I understand calling close() in the ioThread is not common,
>>as
>> >it
>> >may only trigger when we saw some non-retriable error. Hence when user
>>run
>> >their program it is unlikely that close() will be triggered and problem
>> >will be detected. So it seems to me that from the error detection
>>aspect
>> >these two options seems to be the same as people will usually detect it
>> >from the producer metrics all dropping to 0.
>> >
>> >Guozhang
>> >
>> >On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin
>>
>> >wrote:
>> >
>> >> It seems there are two options we can choose from when close() is
>>called
>> >> from sender thread (callback):
>> >> 1. Log an error and close the producer using close(-1)
>> >> 2. Log an error and block.
>> >> (Throwing an exception will not work because we catch all the
>>exception
>> >> thrown from user callback. It will just lead to an error log.)
>> >>
>> >> My concern for the first option is that the producer will be closed
>>even
>> >> if we logged and error. I am wondering if some user would not even
>>take
>> >>a
>> >> look at the log if producer is closed normally. Because from the
>> >>programs
>> >> behavior, everything looks good. If that is the case, the error
>>message
>> >>we
>> >> logged probably will just be ignored until some day when people check
>> >>the
>> >> log and see it.
>> >>
>> >> As for the second option, because producer does not close but blocks.
>> >>User
>> >> will notice this the first time they run the program. They probably
>>will
>> >> look at the log to see why producer could not be closed and they will
>> >>see
>> >> the error log we put there. So they will get informed about this
>> >>mis-usage
>> >> of close() in sender thread the first time they run the code instead
>>of
>> >> some time later.
>> >>
>> >> Personally I prefer the second one because it is more obvious that
>> >> something was wrong.
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >> On 3/15/15, 4:27 PM, "Guozhang Wang"  wrote:
>> >>
>> >> >Yeah I agree we should not silently change the behavior of the
>>function
>> >> >with the given parameters; and I would prefer
>> >>error-logging-and-shutdown
>> >> >over blocking when close(>0) is used, since as Neha suggested
>>blocking
>> >> >would also not proceed with sending any data, bu will just let
>>users to
>> >> >realize the issue later than sooner.
>> >> >
>> >> >On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede 
>> >>wrote:
>> >> >
>> >> >> >
>> >> >> > And I also agree it is better if we can make producer block when
>> >> >> > close() is called from sender thread so user will notice
>>something
>> >> >>went
>> >> >> > wrong.
>> >> >>
>> >> >>
>> >> >> This isn't a great experience either. Why can't we just throw an
>> >> >>exception
>> >> >> for a behavior we know is incorrect and we'd like the user to
>>know.
>> >> >> Blocking as a means of doing that seems wrong and annoying.
>> >> >>
>> >> >> On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps 
>> >> wrote:
>> >> >>
>> >> >> > Cool.
>> >> >> >
>> >> >> > I think blocking is good or alternately throwing an exception
>> >>directly
>> >> >> from
>

Re: Review Request 31958: Patch for KAFKA-1684

2015-03-16 Thread Sriharsha Chintalapani


> On March 16, 2015, 9:24 p.m., Michael Herstine wrote:
> > core/src/main/scala/kafka/network/ssl/SSLChannel.scala, line 137
> > 
> >
> > Suppose SSLEngine has written the current message (via `wrap`) to 
> > `netOutBuffer`, but that the write call in `flush`, when invoked from 
> > `handshakeWrap`, didn't write the entire buffer to the underlying socket.
> > 
> > Would not `handshakeStatus` as reported from SSLEngine now be 
> > `NEEDS_UNWRAP`? And wouldn't that cause us to fall through to the 
> > `NEEDS_UNWRAP` case?
> > 
> > Or do we not fall through in Scala case statements?

Thanks for the review. Ideally it should be fall through to NEEDS_UNWRAP since 
scala case statements doesn't allow java style follow-through I am looking at 
alternatives.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31958/#review76640
---


On March 11, 2015, 9:36 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31958/
> ---
> 
> (Updated March 11, 2015, 9:36 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1684
> https://issues.apache.org/jira/browse/KAFKA-1684
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1684. Implement TLS/SSL authentication.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/network/Channel.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
>   core/src/main/scala/kafka/network/ssl/SSLChannel.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> dddef938fabae157ed8644536eb1a2f329fb42b7 
>   core/src/main/scala/kafka/utils/SSLAuthUtils.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
> 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/utils/TestSSLUtils.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31958/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



Re: Review Request 31958: Patch for KAFKA-1684

2015-03-16 Thread Sriharsha Chintalapani


> On March 16, 2015, 9:24 p.m., Michael Herstine wrote:
> > core/src/main/scala/kafka/network/SocketServer.scala, line 318
> > 
> >
> > `{want,needs}ClientAuth` can be tricky-- check the javadoc for 
> > `SSLEngine.setWantClientAuth`... there are actually only three states: 
> > required, requested, not desired, and the last call to 
> > `{want,needs}ClientAuth` "wins".
> > 
> > So, if "needs" is True and "wants" is false, invoking the methods in 
> > this order will actually overwrite the "needs" setting. Recommend something 
> > like:
> > 
> > if (sslConnectionConfig.needClientAuth) {
> > sslEngine.setNeedClientAuth(true);
> > } else {
> > sslEngine.setNeedClientAuth(false);
> > sslEngine.setWantClientAuth(sslConnectionConfig.wantClientAuth);
> > }

Thanks for pointing it out I'll fix that.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31958/#review76640
---


On March 11, 2015, 9:36 p.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31958/
> ---
> 
> (Updated March 11, 2015, 9:36 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1684
> https://issues.apache.org/jira/browse/KAFKA-1684
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1684. Implement TLS/SSL authentication.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/network/Channel.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 
>   core/src/main/scala/kafka/network/ssl/SSLChannel.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> dddef938fabae157ed8644536eb1a2f329fb42b7 
>   core/src/main/scala/kafka/utils/SSLAuthUtils.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
> 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/utils/TestSSLUtils.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31958/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Yasuhiro Matsuda

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/
---

(Updated March 16, 2015, 9:39 p.m.)


Review request for kafka.


Bugs: KAFKA-2013
https://issues.apache.org/jira/browse/KAFKA-2013


Repository: kafka


Description
---

purgatory micro benchmark


Diffs (updated)
-

  core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31893/diff/


Testing
---


Thanks,

Yasuhiro Matsuda



[jira] [Commented] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364008#comment-14364008
 ] 

Yasuhiro Matsuda commented on KAFKA-2013:
-

Updated reviewboard https://reviews.apache.org/r/31893/diff/
 against branch origin/trunk

> benchmark test for the purgatory
> 
>
> Key: KAFKA-2013
> URL: https://issues.apache.org/jira/browse/KAFKA-2013
> Project: Kafka
>  Issue Type: Test
>  Components: purgatory
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
>Priority: Trivial
> Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, 
> KAFKA-2013_2015-03-16_14:13:20.patch, KAFKA-2013_2015-03-16_14:39:07.patch
>
>
> We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2013) benchmark test for the purgatory

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yasuhiro Matsuda updated KAFKA-2013:

Attachment: KAFKA-2013_2015-03-16_14:39:07.patch

> benchmark test for the purgatory
> 
>
> Key: KAFKA-2013
> URL: https://issues.apache.org/jira/browse/KAFKA-2013
> Project: Kafka
>  Issue Type: Test
>  Components: purgatory
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
>Priority: Trivial
> Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, 
> KAFKA-2013_2015-03-16_14:13:20.patch, KAFKA-2013_2015-03-16_14:39:07.patch
>
>
> We need a micro benchmark test for measuring the purgatory performance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31893: Patch for KAFKA-2013

2015-03-16 Thread Yasuhiro Matsuda


> On March 16, 2015, 5:17 p.m., Jun Rao wrote:
> > core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala, line 193
> > 
> >
> > Is there a particular reason that we need to overwrite isCompleted()? 
> > Typically, only tryComplete() and onComplete() need to be overwritten in a 
> > subclass of DelayedOperation.
> > 
> > Actually, I am not sure how we complete the requests before the timeout 
> > is reached since there is no explict call for tryComplete()?
> 
> Yasuhiro Matsuda wrote:
> isCompleted checks if the current time has passed the schedule completion 
> time rather than if forceComplete has been called. It makes isCompleted 
> always accurate.
> 
> Purgatory checks watcher lists every so often and calls isCompleted. 
> Calling forceComplete from isCompeleted ensures that a completed request is 
> removed from the timing wheels in the new implementation. In terms of timing, 
> this is not very accurate because completed requests may stay longer then 
> they should be. This doesn't affect the old implementaion at all, but it may 
> impose some overheads on the new implementaion. Still, the new one 
> outperforms the old one.
> 
> It is ideal if we can call call forceComplete on scheduled completion 
> time. It requires another timer (DelayQueue or Timer) for that. I think it is 
> too much overhead to measure purgatory performace. And also it is hard to 
> guarantee such a timer works accurately in this test setting.

It looks the watcher list check happens frequent enough in both new and old 
implementations. The average delay to acutal forceComplete call from the 
completion time is several tens of millisecs (low request rate) to 
sub-millisecs (high request rate).


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31893/#review76566
---


On March 16, 2015, 9:39 p.m., Yasuhiro Matsuda wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31893/
> ---
> 
> (Updated March 16, 2015, 9:39 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2013
> https://issues.apache.org/jira/browse/KAFKA-2013
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> purgatory micro benchmark
> 
> 
> Diffs
> -
> 
>   core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31893/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>



Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jun Rao
How does close(0) work if it's called from the sender thread? If close(0)
needs to wait for the sender thread to join, wouldn't this cause a deadlock?

Thanks,

Jun

On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin 
wrote:

> Thanks Guozhang. It wouldn’t be as thoroughly considered without
> discussing with you :)
>
> Jiangjie (Becket) Qin
>
> On 3/16/15, 1:07 PM, "Guozhang Wang"  wrote:
>
> >Thanks Jiangjie,
> >
> >After talking to you offline on this, I have been convinced and changed my
> >preference to blocking. The immediate shutdown approach does have some
> >unsafeness in some cases.
> >
> >Guozhang
> >
> >On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin  >
> >wrote:
> >
> >> It looks that the problem we want to solve and the purpose we want to
> >> achieve is:
> >> If user uses close() in callback, we want to let user be aware that they
> >> should use close(0) instead of close() in the callback.
> >>
> >> We have agreed that we will have an error log to inform user about this
> >> mis-usage. The options differ in the way how we can force user to take a
> >> look at that error log.
> >> There are two scenarios:
> >> 1. User does not expect the program to exit.
> >> 2. User expect the program to exit.
> >>
> >> For scenario 1), blocking will probably delay the discovery of the
> >> problem. Calling close(0) exposes the problem quicker. In this scenario
> >> producer just encounter a send failure when running normally.
> >> For scenario 2), blocking will expose the problem quick. Calling
> >>close(-1)
> >> might hide the problem. This scenario might include: a) Unit test for a
> >> send failure. b) Message sending during a close() call from a user
> >>thread.
> >>
> >> So as a summary table:
> >>
> >>   Scenario 1) Scenario 2)
> >>
> >> Blocking  Delay problem discovery Guaranteed problem
> >>discovery
> >>
> >> Close(-1) Immediate problem discovery Problem might be hidden
> >>
> >>
> >> Personally I prefer blocking because it seems providing more guarantees
> >> and safer.
> >>
> >> Thanks.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >> On 3/16/15, 10:11 AM, "Guozhang Wang"  wrote:
> >>
> >> >HI Jiangjie,
> >> >
> >> >As far as I understand calling close() in the ioThread is not common,
> >>as
> >> >it
> >> >may only trigger when we saw some non-retriable error. Hence when user
> >>run
> >> >their program it is unlikely that close() will be triggered and problem
> >> >will be detected. So it seems to me that from the error detection
> >>aspect
> >> >these two options seems to be the same as people will usually detect it
> >> >from the producer metrics all dropping to 0.
> >> >
> >> >Guozhang
> >> >
> >> >On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin
> >>
> >> >wrote:
> >> >
> >> >> It seems there are two options we can choose from when close() is
> >>called
> >> >> from sender thread (callback):
> >> >> 1. Log an error and close the producer using close(-1)
> >> >> 2. Log an error and block.
> >> >> (Throwing an exception will not work because we catch all the
> >>exception
> >> >> thrown from user callback. It will just lead to an error log.)
> >> >>
> >> >> My concern for the first option is that the producer will be closed
> >>even
> >> >> if we logged and error. I am wondering if some user would not even
> >>take
> >> >>a
> >> >> look at the log if producer is closed normally. Because from the
> >> >>programs
> >> >> behavior, everything looks good. If that is the case, the error
> >>message
> >> >>we
> >> >> logged probably will just be ignored until some day when people check
> >> >>the
> >> >> log and see it.
> >> >>
> >> >> As for the second option, because producer does not close but blocks.
> >> >>User
> >> >> will notice this the first time they run the program. They probably
> >>will
> >> >> look at the log to see why producer could not be closed and they will
> >> >>see
> >> >> the error log we put there. So they will get informed about this
> >> >>mis-usage
> >> >> of close() in sender thread the first time they run the code instead
> >>of
> >> >> some time later.
> >> >>
> >> >> Personally I prefer the second one because it is more obvious that
> >> >> something was wrong.
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >> On 3/15/15, 4:27 PM, "Guozhang Wang"  wrote:
> >> >>
> >> >> >Yeah I agree we should not silently change the behavior of the
> >>function
> >> >> >with the given parameters; and I would prefer
> >> >>error-logging-and-shutdown
> >> >> >over blocking when close(>0) is used, since as Neha suggested
> >>blocking
> >> >> >would also not proceed with sending any data, bu will just let
> >>users to
> >> >> >realize the issue later than sooner.
> >> >> >
> >> >> >On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede 
> >> >>wrote:
> >> >> >
> >> >> >> >
> >> >> >> > And I also agree it is better if we can make producer block when
> >> >> >> > close() is called from sender thread so user will notice
> >>something
> >> >> >>

Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jiangjie Qin
Hi Jun,

Close(0) will set two flags in sender. Running=false and a newly added
forceClose=true. It will also set accumulator.closed=true so no further
producer.send() will succeed.
The sender thread will finish executing all the callbacks in current batch
of responses, then it will see the forceClose flag. It will just fail all
the incomplete batches in the producer and exit.
So close(0) is a non-blocking call and sender thread will not try to join
itself in close(0).

Thanks.

Jiangjie (Becket) Qin

On 3/16/15, 2:50 PM, "Jun Rao"  wrote:

>How does close(0) work if it's called from the sender thread? If close(0)
>needs to wait for the sender thread to join, wouldn't this cause a
>deadlock?
>
>Thanks,
>
>Jun
>
>On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin 
>wrote:
>
>> Thanks Guozhang. It wouldn’t be as thoroughly considered without
>> discussing with you :)
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/16/15, 1:07 PM, "Guozhang Wang"  wrote:
>>
>> >Thanks Jiangjie,
>> >
>> >After talking to you offline on this, I have been convinced and
>>changed my
>> >preference to blocking. The immediate shutdown approach does have some
>> >unsafeness in some cases.
>> >
>> >Guozhang
>> >
>> >On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin
>>> >
>> >wrote:
>> >
>> >> It looks that the problem we want to solve and the purpose we want to
>> >> achieve is:
>> >> If user uses close() in callback, we want to let user be aware that
>>they
>> >> should use close(0) instead of close() in the callback.
>> >>
>> >> We have agreed that we will have an error log to inform user about
>>this
>> >> mis-usage. The options differ in the way how we can force user to
>>take a
>> >> look at that error log.
>> >> There are two scenarios:
>> >> 1. User does not expect the program to exit.
>> >> 2. User expect the program to exit.
>> >>
>> >> For scenario 1), blocking will probably delay the discovery of the
>> >> problem. Calling close(0) exposes the problem quicker. In this
>>scenario
>> >> producer just encounter a send failure when running normally.
>> >> For scenario 2), blocking will expose the problem quick. Calling
>> >>close(-1)
>> >> might hide the problem. This scenario might include: a) Unit test
>>for a
>> >> send failure. b) Message sending during a close() call from a user
>> >>thread.
>> >>
>> >> So as a summary table:
>> >>
>> >>   Scenario 1) Scenario 2)
>> >>
>> >> Blocking  Delay problem discovery Guaranteed problem
>> >>discovery
>> >>
>> >> Close(-1) Immediate problem discovery Problem might be hidden
>> >>
>> >>
>> >> Personally I prefer blocking because it seems providing more
>>guarantees
>> >> and safer.
>> >>
>> >> Thanks.
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >>
>> >> On 3/16/15, 10:11 AM, "Guozhang Wang"  wrote:
>> >>
>> >> >HI Jiangjie,
>> >> >
>> >> >As far as I understand calling close() in the ioThread is not
>>common,
>> >>as
>> >> >it
>> >> >may only trigger when we saw some non-retriable error. Hence when
>>user
>> >>run
>> >> >their program it is unlikely that close() will be triggered and
>>problem
>> >> >will be detected. So it seems to me that from the error detection
>> >>aspect
>> >> >these two options seems to be the same as people will usually
>>detect it
>> >> >from the producer metrics all dropping to 0.
>> >> >
>> >> >Guozhang
>> >> >
>> >> >On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin
>> >>
>> >> >wrote:
>> >> >
>> >> >> It seems there are two options we can choose from when close() is
>> >>called
>> >> >> from sender thread (callback):
>> >> >> 1. Log an error and close the producer using close(-1)
>> >> >> 2. Log an error and block.
>> >> >> (Throwing an exception will not work because we catch all the
>> >>exception
>> >> >> thrown from user callback. It will just lead to an error log.)
>> >> >>
>> >> >> My concern for the first option is that the producer will be
>>closed
>> >>even
>> >> >> if we logged and error. I am wondering if some user would not even
>> >>take
>> >> >>a
>> >> >> look at the log if producer is closed normally. Because from the
>> >> >>programs
>> >> >> behavior, everything looks good. If that is the case, the error
>> >>message
>> >> >>we
>> >> >> logged probably will just be ignored until some day when people
>>check
>> >> >>the
>> >> >> log and see it.
>> >> >>
>> >> >> As for the second option, because producer does not close but
>>blocks.
>> >> >>User
>> >> >> will notice this the first time they run the program. They
>>probably
>> >>will
>> >> >> look at the log to see why producer could not be closed and they
>>will
>> >> >>see
>> >> >> the error log we put there. So they will get informed about this
>> >> >>mis-usage
>> >> >> of close() in sender thread the first time they run the code
>>instead
>> >>of
>> >> >> some time later.
>> >> >>
>> >> >> Personally I prefer the second one because it is more obvious that
>> >> >> something was wrong.
>> >> >>
>> >> >> Jiangjie (Becket) Qin
>> >> >>
>> >> >> On 3

[jira] [Commented] (KAFKA-527) Compression support does numerous byte copies

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364085#comment-14364085
 ] 

Yasuhiro Matsuda commented on KAFKA-527:


Updated reviewboard https://reviews.apache.org/r/31742/diff/
 against branch origin/trunk

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Jay Kreps
>Assignee: Yasuhiro Matsuda
>Priority: Critical
> Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, 
> KAFKA-527_2015-03-16_15:19:29.patch, java.hprof.no-compression.txt, 
> java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31742: Patch for KAFKA-527

2015-03-16 Thread Yasuhiro Matsuda

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/
---

(Updated March 16, 2015, 10:19 p.m.)


Review request for kafka.


Bugs: KAFKA-527
https://issues.apache.org/jira/browse/KAFKA-527


Repository: kafka


Description
---

less byte copies


Diffs (updated)
-

  core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
9c694719dc9b515fb3c3ae96435a87b334044272 
  core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/31742/diff/


Testing
---


Thanks,

Yasuhiro Matsuda



[jira] [Updated] (KAFKA-527) Compression support does numerous byte copies

2015-03-16 Thread Yasuhiro Matsuda (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yasuhiro Matsuda updated KAFKA-527:
---
Attachment: KAFKA-527_2015-03-16_15:19:29.patch

> Compression support does numerous byte copies
> -
>
> Key: KAFKA-527
> URL: https://issues.apache.org/jira/browse/KAFKA-527
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Jay Kreps
>Assignee: Yasuhiro Matsuda
>Priority: Critical
> Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, 
> KAFKA-527_2015-03-16_15:19:29.patch, java.hprof.no-compression.txt, 
> java.hprof.snappy.text
>
>
> The data path for compressing or decompressing messages is extremely 
> inefficient. We do something like 7 (?) complete copies of the data, often 
> for simple things like adding a 4 byte size to the front. I am not sure how 
> this went by unnoticed.
> This is likely the root cause of the performance issues we saw in doing bulk 
> recompression of data in mirror maker.
> The mismatch between the InputStream and OutputStream interfaces and the 
> Message/MessageSet interfaces which are based on byte buffers is the cause of 
> many of these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31967: Patch for KAFKA-1546

2015-03-16 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76641
---


lgtm overall. Minor comments below.


core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala


should be not be -> can you fix/remove?



core/src/main/scala/kafka/cluster/Partition.scala


Wonder why this got split. Can you re-optimize imports?



core/src/main/scala/kafka/cluster/Partition.scala


Can you move the if statement to the next line



core/src/main/scala/kafka/cluster/Partition.scala


Can we rename the argument to maxLagMs?



core/src/main/scala/kafka/cluster/Partition.scala


Minor edit:
"has not read up to the LEO within the last replicaMaxLag ms, then the 
follower is lagging and should be removed from the ISR"



core/src/main/scala/kafka/cluster/Partition.scala


(Not part of your change, but could you change [%s,%d] to %s and replace 
topic, partitionId to TopicAndPartition(topic, partitionId)? We are trying to 
adopt a uniform convention everywhere in printing topic-partition and have been 
making these changes gradually (as they appear).



core/src/main/scala/kafka/cluster/Partition.scala


same here



core/src/main/scala/kafka/cluster/Replica.scala


Can you rename this to lagBeginTimeMsUnderlying?



core/src/main/scala/kafka/cluster/Replica.scala


read up to the log end offset snapshot when the read was initiated ...



core/src/main/scala/kafka/server/ReplicaManager.scala


Can we rename this to logEndOffsetBeforeRead?

Also, can we just do with the Long (offset) instead of the entire 
LogOffsetMetadata?


- Joel Koshy


On March 16, 2015, 6:32 p.m., Aditya Auradkar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> ---
> 
> (Updated March 16, 2015, 6:32 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
> https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time 
> since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the 
> LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala 
> bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 
> 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 
> 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
> 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>



Re: Review Request 31742: Patch for KAFKA-527

2015-03-16 Thread Yasuhiro Matsuda


On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote:
> > The inheritance of MessageWriter from BufferingOutputStream is a bit 
> > confusing, since it will always use itself in the writePayload function 
> > parameter. 
> > 
> > I feel it is more clear to read the code if we just let MessageWriter 
> > contains a var of BufferingOutputStream; and instead of pass in the 
> > function logic of writing the message, we can just pass in messages and 
> > offsetCounter in the write() call which will then write the messages itself.
> 
> Yasuhiro Matsuda wrote:
> It is true that the current code writes only through writePayload. But I 
> wanted MessageWriter to be a subclass of OutputStream to be more generic in 
> case we need to write additional inforation other than messages in future.
> 
> Guozhang Wang wrote:
> As for now MessageWriter's only public function is write(key, codec) 
> (valueWritefunction), which is used for writing a single message. Also its 
> private functions withCrc32Prefix / withLengthPrefix is only used for message 
> writing. So it is a bit unclear about your motivation in future extensions. 
> Could you elaborate a bit more on that?

I don't know future usages at this point.

Besides, withCrc32Prefix uses internal structure of BufferingOutputStream for 
efficiency. Does this justify the inheritance? If we don't do so, the code will 
be more cluttered.


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/#review76454
---


On March 16, 2015, 10:19 p.m., Yasuhiro Matsuda wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31742/
> ---
> 
> (Updated March 16, 2015, 10:19 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-527
> https://issues.apache.org/jira/browse/KAFKA-527
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> less byte copies
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
> 9c694719dc9b515fb3c3ae96435a87b334044272 
>   core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31742/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>



[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-16 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364166#comment-14364166
 ] 

Parth Brahmbhatt commented on KAFKA-1688:
-

[~jkreps][~junrao] Not sure if you guys had time to review 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface
 but I have to make a design choice and will appreciate your input.

As mentioned in the KIP, I originally thought we will just update TopicMetadata 
class to hold the acls and owner which should allow us to reuse 
TopicMetadataCache to get the acl information. However on further inspection I 
realized that TopicMetadataCache is just serving as the cache for Topic's 
partition state info and we have a completely different mechanism for caching 
and updating topic config entries.

Currently the topic config is all about Log configuration so we have  a 
TopicConfigManager which takes in a Log instance and keeps updating that 
instance's config instance as and when the topic config is updated. The topic 
config update notifications are sent using zk watchers by Controller.

I propose to introduce a TopicConfigCache which will be updated by 
TopicConfigManager on any config changes. Both the log instance and authorizer 
will share an instance of TopicConfigCache to read the config entries from it. 
The acls and owner of the topic will be stored as part of topic config. 

An alternate solution is to modify the TopicMetadataCache so it also has topic 
configs. The controller will have to send updateTopicMedataCache requests on 
both partition changes and config changes. We will have to deprecate 
TopicConfigManager and the controller code that updates zk state to fire config 
change watchers. 

I am currently blocked by this so I appreciate any feedback from you guys.

> Add authorization interface and naive implementation
> 
>
> Key: KAFKA-1688
> URL: https://issues.apache.org/jira/browse/KAFKA-1688
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Parth Brahmbhatt
> Fix For: 0.8.3
>
>
> Add a PermissionManager interface as described here:
> https://cwiki.apache.org/confluence/display/KAFKA/Security
> (possibly there is a better name?)
> Implement calls to the PermissionsManager in KafkaApis for the main requests 
> (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
> exception to the protocol to indicate "permission denied".
> Add a server configuration to give the class you want to instantiate that 
> implements that interface. That class can define its own configuration 
> properties from the main config file.
> Provide a simple implementation of this interface which just takes a user and 
> ip whitelist and permits those in either of the whitelists to do anything, 
> and denies all others.
> Rather than writing an integration test for this class we can probably just 
> use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Anatoli Fomenko (JIRA)
Anatoli Fomenko created KAFKA-2023:
--

 Summary: git clone kafka repository requires https
 Key: KAFKA-2023
 URL: https://issues.apache.org/jira/browse/KAFKA-2023
 Project: Kafka
  Issue Type: Bug
  Components: website
Reporter: Anatoli Fomenko
Priority: Minor


>From http://kafka.apache.org/code.html: 

Our code is kept in git. You can check it out like this:
git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka

On CentOS 6.5:

{code}
$ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
Initialized empty Git repository in /home/anatoli/git/kafka/.git/
error: RPC failed; result=22, HTTP code = 405
{code}

while:

{code}
$ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
Initialized empty Git repository in /home/anatoli/git/kafka/.git/
remote: Counting objects: 24607, done.
remote: Compressing objects: 100% (9212/9212), done.
remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
Resolving deltas: 100% (14449/14449), done.
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364184#comment-14364184
 ] 

Joe Stein commented on KAFKA-2023:
--

works ok for me on ubuntu and redhat on two different networks

{code}

$ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
Cloning into 'kafka'...
remote: Counting objects: 24607, done.
remote: Compressing objects: 100% (9212/9212), done.
remote: Total 24607 (delta 14447), reused 19803 (delta 11465)
Receiving objects: 100% (24607/24607), 15.62 MiB | 3.46 MiB/s, done.
Resolving deltas: 100% (14447/14447), done.
Checking connectivity... done.
{code}

> git clone kafka repository requires https
> -
>
> Key: KAFKA-2023
> URL: https://issues.apache.org/jira/browse/KAFKA-2023
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Anatoli Fomenko
>Priority: Minor
>
> From http://kafka.apache.org/code.html: 
> Our code is kept in git. You can check it out like this:
>   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> On CentOS 6.5:
> {code}
> $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> error: RPC failed; result=22, HTTP code = 405
> {code}
> while:
> {code}
> $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> remote: Counting objects: 24607, done.
> remote: Compressing objects: 100% (9212/9212), done.
> remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
> Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
> Resolving deltas: 100% (14449/14449), done.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Anatoli Fomenko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anatoli Fomenko updated KAFKA-2023:
---
Attachment: KAFKA-2023.patch

Please review the patch.

Thank you.

> git clone kafka repository requires https
> -
>
> Key: KAFKA-2023
> URL: https://issues.apache.org/jira/browse/KAFKA-2023
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Anatoli Fomenko
>Priority: Minor
> Attachments: KAFKA-2023.patch
>
>
> From http://kafka.apache.org/code.html: 
> Our code is kept in git. You can check it out like this:
>   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> On CentOS 6.5:
> {code}
> $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> error: RPC failed; result=22, HTTP code = 405
> {code}
> while:
> {code}
> $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> remote: Counting objects: 24607, done.
> remote: Compressing objects: 100% (9212/9212), done.
> remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
> Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
> Resolving deltas: 100% (14449/14449), done.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Anatoli Fomenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364205#comment-14364205
 ] 

Anatoli Fomenko commented on KAFKA-2023:


Works for me on MacOS but not on one CentOS 6.5.

Perhaps these words would work better?

If you see "error: RPC failed; result=22, HTTP code = 405"
use 
git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka



> git clone kafka repository requires https
> -
>
> Key: KAFKA-2023
> URL: https://issues.apache.org/jira/browse/KAFKA-2023
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Anatoli Fomenko
>Priority: Minor
> Attachments: KAFKA-2023.patch
>
>
> From http://kafka.apache.org/code.html: 
> Our code is kept in git. You can check it out like this:
>   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> On CentOS 6.5:
> {code}
> $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> error: RPC failed; result=22, HTTP code = 405
> {code}
> while:
> {code}
> $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> remote: Counting objects: 24607, done.
> remote: Compressing objects: 100% (9212/9212), done.
> remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
> Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
> Resolving deltas: 100% (14449/14449), done.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Jun Rao
It's probably useful for a client to know whether its requests are
throttled or not (e.g., for monitoring and alerting). From that
perspective, option B (delay the requests and return an error) seems better.

Thanks,

Jun

On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar <
aaurad...@linkedin.com.invalid> wrote:

> Posted a KIP for quotas in kafka.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
>
> Appreciate any feedback.
>
> Aditya
>


[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364263#comment-14364263
 ] 

Joe Stein commented on KAFKA-2023:
--

looks like maybe the issue is the version of git, i tried a few other asf repos 
same issue with git 1.7.1 what comes with yum install git

> git clone kafka repository requires https
> -
>
> Key: KAFKA-2023
> URL: https://issues.apache.org/jira/browse/KAFKA-2023
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Anatoli Fomenko
>Priority: Minor
> Attachments: KAFKA-2023.patch
>
>
> From http://kafka.apache.org/code.html: 
> Our code is kept in git. You can check it out like this:
>   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> On CentOS 6.5:
> {code}
> $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> error: RPC failed; result=22, HTTP code = 405
> {code}
> while:
> {code}
> $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> remote: Counting objects: 24607, done.
> remote: Compressing objects: 100% (9212/9212), done.
> remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
> Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
> Resolving deltas: 100% (14449/14449), done.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Aditya Auradkar
In Jay's approach, a client will simply experience a delay in receiving a 
response. The primary benefit is that there are no concerns regarding data-loss 
because the data has already been appended. Retries are also a non-issue since 
there is no need for them. However, the drawback to append and delay is that if 
the socket timeout is reached (30 second default I believe), the client can 
disconnect and try to resend the batch to the server. This will cause data 
duplication since the server cannot distinguish duplicate batches. However, it 
is very likely that the maximum quota delay will be lower than the socket 
timeout unless someone explicitly overrides it. We can make this even more 
unlikely by having a fixed lower bound on the socket timeout (10 seconds?). In 
this approach we must also ignore the request timeout since a small timeout 
will completely bypass quotas.

In the other approach, assuming the client only retries a fixed number of 
times, it will eventually experience data loss since the producer will drop the 
batch at some point. IMO, it is more likely that we will see this issue in 
production than the other issues identified above.

I agree with Jay that we can delay the request longer than the request timeout 
since it isn't possible to enforce perfectly on the server anyway. I think that 
we should have a maximum delay config on the server that provides a ceiling on 
the most time we can delay a request and have it be lower than the socket 
timeout. 

Initially, I preferred delay and error because it seems like the most natural 
way to handle quota violations.. but I'm starting to see the merit in Jay's 
approach. Practically speaking, it reduces the number of moving parts in 
delivering quotas for Kafka. All changes are localized to the broker and is 
compatible with existing clients. Client changes will be required only if we 
return quota metadata in the responses or add a quota metadata API.
If we discover in production that this isn't working for some reason.. we can 
always revisit this approach of returning errors and having the clients handle 
them.

Note that both these data loss/duplicate issues only affect the producer. 
Consumers should be fine regardless of the approach we choose.

Aditya

From: Jun Rao [j...@confluent.io]
Sent: Monday, March 16, 2015 4:27 PM
To: dev@kafka.apache.org
Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

It's probably useful for a client to know whether its requests are
throttled or not (e.g., for monitoring and alerting). From that
perspective, option B (delay the requests and return an error) seems better.

Thanks,

Jun

On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar <
aaurad...@linkedin.com.invalid> wrote:

> Posted a KIP for quotas in kafka.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
>
> Appreciate any feedback.
>
> Aditya
>


[jira] [Created] (KAFKA-2024) Cleaner can generate unindexable log segments

2015-03-16 Thread Gian Merlino (JIRA)
Gian Merlino created KAFKA-2024:
---

 Summary: Cleaner can generate unindexable log segments
 Key: KAFKA-2024
 URL: https://issues.apache.org/jira/browse/KAFKA-2024
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.0
Reporter: Gian Merlino


It's possible for log cleaning to generate segments that have a gap of more 
than Int.MaxValue between their base offset and their last offset. It's not 
possible to index those segments since there's only 4 bytes available to store 
that difference. The broker will end up writing overflowed ints into the index, 
and doesn't detect that there is a problem until restarted, at which point you 
get one of these:

2015-03-16 20:35:49,632 FATAL [main] kafka.server.KafkaServerStartable - Fatal 
error during KafkaServerStartable startup. Prepare to shutdown
java.lang.IllegalArgumentException: requirement failed: Corrupt index found, 
index file (/mnt/persistent/kafka-logs/topic/.index) has 
non-zero size but the last offset is -1634293959 and the base offset is 0
at scala.Predef$.require(Predef.scala:233)
at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352)
at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:204)
at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:203)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.log.Log.loadSegments(Log.scala:203)
at kafka.log.Log.(Log.scala:67)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142)
at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29091: Improve 1646 fix by reduce check if Os.IsWindows

2015-03-16 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29091/#review76688
---



core/src/main/scala/kafka/utils/Utils.scala


can we do setLength for all os  not just specific to windows?


- Sriharsha Chintalapani


On March 13, 2015, 3:12 a.m., Qianlin Xia wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29091/
> ---
> 
> (Updated March 13, 2015, 3:12 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1646
> https://issues.apache.org/jira/browse/KAFKA-1646
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Improve 1646 fix by reduce check if Os.IsWindows
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/FileMessageSet.scala 
> e1f8b979c3e6f62ea235bd47bc1587a1291443f9 
>   core/src/main/scala/kafka/log/Log.scala 
> 46df8d99d977a3b010a9b9f4698187fa9bfb2498 
>   core/src/main/scala/kafka/log/LogManager.scala 
> 7cee5435b23fcd0d76f531004911a2ca499df4f8 
>   core/src/main/scala/kafka/log/LogSegment.scala 
> 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/utils/Utils.scala 
> a89b0463685e6224d263bc9177075e1bb6b93d04 
> 
> Diff: https://reviews.apache.org/r/29091/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Qianlin Xia
> 
>



[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2015-03-16 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364344#comment-14364344
 ] 

Sriharsha Chintalapani commented on KAFKA-1646:
---

[~waldenchen] Its looks like the patch is against 0.8.1.1 branch can you send 
us a patch against trunk. 


> Improve consumer read performance for Windows
> -
>
> Key: KAFKA-1646
> URL: https://issues.apache.org/jira/browse/KAFKA-1646
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.1.1
> Environment: Windows
>Reporter: xueqiang wang
>Assignee: xueqiang wang
>  Labels: newbie, patch
> Attachments: Improve consumer read performance for Windows.patch, 
> KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, 
> KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, 
> KAFKA-1646_20150312_200352.patch
>
>
> This patch is for Window platform only. In Windows platform, if there are 
> more than one replicas writing to disk, the segment log files will not be 
> consistent in disk and then consumer reading performance will be dropped down 
> greatly. This fix allocates more disk spaces when rolling a new segment, and 
> then it will improve the consumer reading performance in NTFS file system.
> This patch doesn't affect file allocation of other filesystems, for it only 
> adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Anatoli Fomenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364404#comment-14364404
 ] 

Anatoli Fomenko commented on KAFKA-2023:


Concur: the problem persists with git 1.7.1, and does not with later versions, 
such as 2.0.4.

The question is if the site "supports" git 1.7.1 that is a default CentOS 6 
version.

> git clone kafka repository requires https
> -
>
> Key: KAFKA-2023
> URL: https://issues.apache.org/jira/browse/KAFKA-2023
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Anatoli Fomenko
>Priority: Minor
> Attachments: KAFKA-2023.patch
>
>
> From http://kafka.apache.org/code.html: 
> Our code is kept in git. You can check it out like this:
>   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> On CentOS 6.5:
> {code}
> $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> error: RPC failed; result=22, HTTP code = 405
> {code}
> while:
> {code}
> $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> remote: Counting objects: 24607, done.
> remote: Compressing objects: 100% (9212/9212), done.
> remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
> Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
> Resolving deltas: 100% (14449/14449), done.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14364412#comment-14364412
 ] 

Gwen Shapira commented on KAFKA-2023:
-

If its the default on a popular OS, I'd update the docs with this information.

> git clone kafka repository requires https
> -
>
> Key: KAFKA-2023
> URL: https://issues.apache.org/jira/browse/KAFKA-2023
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Anatoli Fomenko
>Priority: Minor
> Attachments: KAFKA-2023.patch
>
>
> From http://kafka.apache.org/code.html: 
> Our code is kept in git. You can check it out like this:
>   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> On CentOS 6.5:
> {code}
> $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> error: RPC failed; result=22, HTTP code = 405
> {code}
> while:
> {code}
> $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> remote: Counting objects: 24607, done.
> remote: Compressing objects: 100% (9212/9212), done.
> remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
> Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
> Resolving deltas: 100% (14449/14449), done.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein reassigned KAFKA-2023:


Assignee: Anatoly Fayngelerin

> git clone kafka repository requires https
> -
>
> Key: KAFKA-2023
> URL: https://issues.apache.org/jira/browse/KAFKA-2023
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Anatoli Fomenko
>Assignee: Anatoly Fayngelerin
>Priority: Minor
> Attachments: KAFKA-2023.patch
>
>
> From http://kafka.apache.org/code.html: 
> Our code is kept in git. You can check it out like this:
>   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> On CentOS 6.5:
> {code}
> $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> error: RPC failed; result=22, HTTP code = 405
> {code}
> while:
> {code}
> $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> remote: Counting objects: 24607, done.
> remote: Compressing objects: 100% (9212/9212), done.
> remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
> Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
> Resolving deltas: 100% (14449/14449), done.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-2023:
-
Reviewer: Gwen Shapira

> git clone kafka repository requires https
> -
>
> Key: KAFKA-2023
> URL: https://issues.apache.org/jira/browse/KAFKA-2023
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Anatoli Fomenko
>Assignee: Anatoly Fayngelerin
>Priority: Minor
> Attachments: KAFKA-2023.patch
>
>
> From http://kafka.apache.org/code.html: 
> Our code is kept in git. You can check it out like this:
>   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> On CentOS 6.5:
> {code}
> $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> error: RPC failed; result=22, HTTP code = 405
> {code}
> while:
> {code}
> $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> remote: Counting objects: 24607, done.
> remote: Compressing objects: 100% (9212/9212), done.
> remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
> Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
> Resolving deltas: 100% (14449/14449), done.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2023) git clone kafka repository requires https

2015-03-16 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein resolved KAFKA-2023.
--
Resolution: Fixed

pushed change to doc

> git clone kafka repository requires https
> -
>
> Key: KAFKA-2023
> URL: https://issues.apache.org/jira/browse/KAFKA-2023
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Anatoli Fomenko
>Assignee: Anatoly Fayngelerin
>Priority: Minor
> Attachments: KAFKA-2023.patch
>
>
> From http://kafka.apache.org/code.html: 
> Our code is kept in git. You can check it out like this:
>   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> On CentOS 6.5:
> {code}
> $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> error: RPC failed; result=22, HTTP code = 405
> {code}
> while:
> {code}
> $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> remote: Counting objects: 24607, done.
> remote: Compressing objects: 100% (9212/9212), done.
> remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
> Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
> Resolving deltas: 100% (14449/14449), done.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-6 - New reassignment partition logic for re-balancing

2015-03-16 Thread Jun Rao
Hi, Joe,

A couple of comments.

1. When creating a new topic, our replica assignment algorithm tries to
achieve a few things: (a) all replicas are spread evenly across brokers;
(b) the preferred replica (first replica in the assigned replica list) of
all partitions are spread evenly across brokers; (c) the non-preferred
replicas are spread out in such a way that if we lose a broker, the load on
the failed broker is spread evenly among the remaining brokers.

For example, if you look at the following replica assignment on brokers b1,
b2, and b3 (with replication factor 2). Broker b1 will be the leader for
partition p0 and p3. Broker b2 will be the leader for partition p1 and p4.
Broker b3 will be the leader for partition p2 and p5. If b1 is gone, b2
will take over as the leader for p0 and b3 will take over as the leader for
p3. This strategy makes sure that the load is even in the normal case as
well as the failure case.

b1 b2 b3
p0 p1 p2
p2 p0 p1
p3 p4 p5
p4 p5 p3

The current reassignment strategy actually maintains properties (a), (b)
and (c) after the reassignment completes.

The new algorithm takes the last few replicas from an overloaded broker and
moves them to an underloaded broker. It does reduce the data movement
compared with the current algorithm. It also maintains property (a).
However, it doesn't seem to explicitly maintain properties (b) and (c).
Data movement is a one-time cost. Maintaining balance after the data
movement has long term benefit. So, it will be useful to try to maintain
these properties even perhaps at the expense of a bit more data movement.

Also, I think the new algorithm needs to make sure that we don't move the
same replica to a new broker more than once.

2. I am not sure that we need to add a new --rebalance option. All we are
changing is the assignment strategy. If that's a better strategy than
before, there is no reason for anyone to use the old strategy. So, the new
strategy should just be used in the --generate mode.

Thanks,

Jun




On Wed, Mar 11, 2015 at 12:12 PM, Joe Stein  wrote:

> Sorry for not catching up on this thread earlier, I wanted to-do this
> before the KIP got its updates so we could discuss if need be and not waste
> more time re-writing/working things that folks have issues with or such. I
> captured all the comments so far here with responses.
>
> << So fair assignment by count (taking into account the current partition
> count of each broker) is very good. However, it's worth noting that all
> partitions are not created equal. We have actually been performing more
> rebalance work based on the partition size on disk, as given equal
> retention of all topics, the size on disk is a better indicator of the
> amount of traffic a partition gets, both in terms of storage and network
> traffic. Overall, this seems to be a better balance.
>
> Agreed though this is out of scope (imho) for what the motivations for the
> KIP were. The motivations section is blank (that is on me) but honestly it
> is because we did all the development, went back and forth with Neha on the
> testing and then had to back it all into the KIP process... Its a
> time/resource/scheduling and hope to update this soon on the KIP ... all of
> this is in the JIRA and code patch so its not like it is not there just not
> in the place maybe were folks are looking since we changed where folks
> should look.
>
> Initial cut at "Motivations": the --generate is not used by a lot of folks
> because they don't trust it. Issues such as giving different results
> sometimes when you run it. Also other feedback from the community that it
> does not account for specific uses cases like "adding new brokers" and
> "removing brokers" (which is where that patch started
> https://issues.apache.org/jira/browse/KAFKA-1678 but then we changed it
> after review into just --rebalance
> https://issues.apache.org/jira/browse/KAFKA-1792). The use case for add
> and
> remove brokers is one that happens in AWS and auto scailing. There are
> other reasons for this too of course.  The goal originally was to make what
> folks are already coding today (with the output of " available in the
> project for the community. Based on the discussion in the JIRA with Neha we
> all agreed that making it be a faire rebalance would fulfill both uses
> cases.
>
> << In addition to this, I think there is very much a need to have Kafka be
> rack-aware. That is, to be able to assure that for a given cluster, you
> never assign all replicas for a given partition in the same rack. This
> would allow us to guard against maintenances or power failures that affect
> a full rack of systems (or a given switch).
>
> Agreed, this though I think is out of scope for this change and something
> we can also do in the future. There is more that we have to figure out for
> rack aware specifically answering "how do we know what rack the broker is
> on". I really really (really) worry that we keep trying to put too much
> into a single change t

Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Jun Rao
Hmm, does that mean that after close(0), the sender thread is not necessary
gone? Normally, after closing an entity, we expect all internal threads
associated with the entity are shut down completely.

Thanks,

Jun

On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin 
wrote:

> Hi Jun,
>
> Close(0) will set two flags in sender. Running=false and a newly added
> forceClose=true. It will also set accumulator.closed=true so no further
> producer.send() will succeed.
> The sender thread will finish executing all the callbacks in current batch
> of responses, then it will see the forceClose flag. It will just fail all
> the incomplete batches in the producer and exit.
> So close(0) is a non-blocking call and sender thread will not try to join
> itself in close(0).
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On 3/16/15, 2:50 PM, "Jun Rao"  wrote:
>
> >How does close(0) work if it's called from the sender thread? If close(0)
> >needs to wait for the sender thread to join, wouldn't this cause a
> >deadlock?
> >
> >Thanks,
> >
> >Jun
> >
> >On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin 
> >wrote:
> >
> >> Thanks Guozhang. It wouldn’t be as thoroughly considered without
> >> discussing with you :)
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 3/16/15, 1:07 PM, "Guozhang Wang"  wrote:
> >>
> >> >Thanks Jiangjie,
> >> >
> >> >After talking to you offline on this, I have been convinced and
> >>changed my
> >> >preference to blocking. The immediate shutdown approach does have some
> >> >unsafeness in some cases.
> >> >
> >> >Guozhang
> >> >
> >> >On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin
> >> >> >
> >> >wrote:
> >> >
> >> >> It looks that the problem we want to solve and the purpose we want to
> >> >> achieve is:
> >> >> If user uses close() in callback, we want to let user be aware that
> >>they
> >> >> should use close(0) instead of close() in the callback.
> >> >>
> >> >> We have agreed that we will have an error log to inform user about
> >>this
> >> >> mis-usage. The options differ in the way how we can force user to
> >>take a
> >> >> look at that error log.
> >> >> There are two scenarios:
> >> >> 1. User does not expect the program to exit.
> >> >> 2. User expect the program to exit.
> >> >>
> >> >> For scenario 1), blocking will probably delay the discovery of the
> >> >> problem. Calling close(0) exposes the problem quicker. In this
> >>scenario
> >> >> producer just encounter a send failure when running normally.
> >> >> For scenario 2), blocking will expose the problem quick. Calling
> >> >>close(-1)
> >> >> might hide the problem. This scenario might include: a) Unit test
> >>for a
> >> >> send failure. b) Message sending during a close() call from a user
> >> >>thread.
> >> >>
> >> >> So as a summary table:
> >> >>
> >> >>   Scenario 1) Scenario 2)
> >> >>
> >> >> Blocking  Delay problem discovery Guaranteed problem
> >> >>discovery
> >> >>
> >> >> Close(-1) Immediate problem discovery Problem might be hidden
> >> >>
> >> >>
> >> >> Personally I prefer blocking because it seems providing more
> >>guarantees
> >> >> and safer.
> >> >>
> >> >> Thanks.
> >> >>
> >> >> Jiangjie (Becket) Qin
> >> >>
> >> >>
> >> >> On 3/16/15, 10:11 AM, "Guozhang Wang"  wrote:
> >> >>
> >> >> >HI Jiangjie,
> >> >> >
> >> >> >As far as I understand calling close() in the ioThread is not
> >>common,
> >> >>as
> >> >> >it
> >> >> >may only trigger when we saw some non-retriable error. Hence when
> >>user
> >> >>run
> >> >> >their program it is unlikely that close() will be triggered and
> >>problem
> >> >> >will be detected. So it seems to me that from the error detection
> >> >>aspect
> >> >> >these two options seems to be the same as people will usually
> >>detect it
> >> >> >from the producer metrics all dropping to 0.
> >> >> >
> >> >> >Guozhang
> >> >> >
> >> >> >On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin
> >> >>
> >> >> >wrote:
> >> >> >
> >> >> >> It seems there are two options we can choose from when close() is
> >> >>called
> >> >> >> from sender thread (callback):
> >> >> >> 1. Log an error and close the producer using close(-1)
> >> >> >> 2. Log an error and block.
> >> >> >> (Throwing an exception will not work because we catch all the
> >> >>exception
> >> >> >> thrown from user callback. It will just lead to an error log.)
> >> >> >>
> >> >> >> My concern for the first option is that the producer will be
> >>closed
> >> >>even
> >> >> >> if we logged and error. I am wondering if some user would not even
> >> >>take
> >> >> >>a
> >> >> >> look at the log if producer is closed normally. Because from the
> >> >> >>programs
> >> >> >> behavior, everything looks good. If that is the case, the error
> >> >>message
> >> >> >>we
> >> >> >> logged probably will just be ignored until some day when people
> >>check
> >> >> >>the
> >> >> >> log and see it.
> >> >> >>
> >> >> >> As for the second option, because producer does not close but
> >>blocks.
> >> >> >>User
> >> 

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Jay Kreps
Hey Jun,

I'd really really really like to avoid that. Having just spent a bunch of
time on the clients, using the error codes to encode other information
about the response is super dangerous. The error handling is one of the
hardest parts of the client (Guozhang chime in here).

Generally the error handling looks like
  if(error == none)
 // good, process the request
  else if(error == KNOWN_ERROR_1)
 // handle known error 1
  else if(error == KNOWN_ERROR_2)
 // handle known error 2
  else
 throw Errors.forCode(error).exception(); // or some other default
behavior

This works because we have a convention that and error is something that
prevented your getting the response so the default handling case is sane
and forward compatible. It is tempting to use the error code to convey
information in the success case. For example we could use error codes to
encode whether quotas were enforced, whether the request was served out of
cache, whether the stock market is up today, or whatever. The problem is
that since these are not errors as far as the client is concerned it should
not throw an exception but process the response, but now we created an
explicit requirement that that error be handled explicitly since it is
different. I really think that this kind of information is not an error, it
is just information, and if we want it in the response we should do the
right thing and add a new field to the response.

I think you saw the Samza bug that was literally an example of this
happening and leading to an infinite retry loop.

Further more I really want to emphasize that hitting your quota in the
design that Adi has proposed is actually not an error condition at all. It
is totally reasonable in any bootstrap situation to intentionally want to
run at the limit the system imposes on you.

-Jay



On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao  wrote:

> It's probably useful for a client to know whether its requests are
> throttled or not (e.g., for monitoring and alerting). From that
> perspective, option B (delay the requests and return an error) seems
> better.
>
> Thanks,
>
> Jun
>
> On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar <
> aaurad...@linkedin.com.invalid> wrote:
>
> > Posted a KIP for quotas in kafka.
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
> >
> > Appreciate any feedback.
> >
> > Aditya
> >
>


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Gwen Shapira
I think its not too late to reserve a set of error codes (200-299?)
for "non-error" codes.

It won't be backward compatible (i.e. clients that currently do "else
throw" will throw on non-errors), but perhaps its worthwhile.

On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps  wrote:
> Hey Jun,
>
> I'd really really really like to avoid that. Having just spent a bunch of
> time on the clients, using the error codes to encode other information
> about the response is super dangerous. The error handling is one of the
> hardest parts of the client (Guozhang chime in here).
>
> Generally the error handling looks like
>   if(error == none)
>  // good, process the request
>   else if(error == KNOWN_ERROR_1)
>  // handle known error 1
>   else if(error == KNOWN_ERROR_2)
>  // handle known error 2
>   else
>  throw Errors.forCode(error).exception(); // or some other default
> behavior
>
> This works because we have a convention that and error is something that
> prevented your getting the response so the default handling case is sane
> and forward compatible. It is tempting to use the error code to convey
> information in the success case. For example we could use error codes to
> encode whether quotas were enforced, whether the request was served out of
> cache, whether the stock market is up today, or whatever. The problem is
> that since these are not errors as far as the client is concerned it should
> not throw an exception but process the response, but now we created an
> explicit requirement that that error be handled explicitly since it is
> different. I really think that this kind of information is not an error, it
> is just information, and if we want it in the response we should do the
> right thing and add a new field to the response.
>
> I think you saw the Samza bug that was literally an example of this
> happening and leading to an infinite retry loop.
>
> Further more I really want to emphasize that hitting your quota in the
> design that Adi has proposed is actually not an error condition at all. It
> is totally reasonable in any bootstrap situation to intentionally want to
> run at the limit the system imposes on you.
>
> -Jay
>
>
>
> On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao  wrote:
>
>> It's probably useful for a client to know whether its requests are
>> throttled or not (e.g., for monitoring and alerting). From that
>> perspective, option B (delay the requests and return an error) seems
>> better.
>>
>> Thanks,
>>
>> Jun
>>
>> On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar <
>> aaurad...@linkedin.com.invalid> wrote:
>>
>> > Posted a KIP for quotas in kafka.
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
>> >
>> > Appreciate any feedback.
>> >
>> > Aditya
>> >
>>


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Jay Kreps
My concern is that as soon as you start encoding non-error response
information into error codes the next question is what to do if two such
codes apply (i.e. you have a replica down and the response is quota'd). I
think I am trying to argue that error should mean "why we failed your
request", for which there will really only be one reason, and any other
useful information we want to send back is just another field in the
response.

-Jay

On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira  wrote:

> I think its not too late to reserve a set of error codes (200-299?)
> for "non-error" codes.
>
> It won't be backward compatible (i.e. clients that currently do "else
> throw" will throw on non-errors), but perhaps its worthwhile.
>
> On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps  wrote:
> > Hey Jun,
> >
> > I'd really really really like to avoid that. Having just spent a bunch of
> > time on the clients, using the error codes to encode other information
> > about the response is super dangerous. The error handling is one of the
> > hardest parts of the client (Guozhang chime in here).
> >
> > Generally the error handling looks like
> >   if(error == none)
> >  // good, process the request
> >   else if(error == KNOWN_ERROR_1)
> >  // handle known error 1
> >   else if(error == KNOWN_ERROR_2)
> >  // handle known error 2
> >   else
> >  throw Errors.forCode(error).exception(); // or some other default
> > behavior
> >
> > This works because we have a convention that and error is something that
> > prevented your getting the response so the default handling case is sane
> > and forward compatible. It is tempting to use the error code to convey
> > information in the success case. For example we could use error codes to
> > encode whether quotas were enforced, whether the request was served out
> of
> > cache, whether the stock market is up today, or whatever. The problem is
> > that since these are not errors as far as the client is concerned it
> should
> > not throw an exception but process the response, but now we created an
> > explicit requirement that that error be handled explicitly since it is
> > different. I really think that this kind of information is not an error,
> it
> > is just information, and if we want it in the response we should do the
> > right thing and add a new field to the response.
> >
> > I think you saw the Samza bug that was literally an example of this
> > happening and leading to an infinite retry loop.
> >
> > Further more I really want to emphasize that hitting your quota in the
> > design that Adi has proposed is actually not an error condition at all.
> It
> > is totally reasonable in any bootstrap situation to intentionally want to
> > run at the limit the system imposes on you.
> >
> > -Jay
> >
> >
> >
> > On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao  wrote:
> >
> >> It's probably useful for a client to know whether its requests are
> >> throttled or not (e.g., for monitoring and alerting). From that
> >> perspective, option B (delay the requests and return an error) seems
> >> better.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar <
> >> aaurad...@linkedin.com.invalid> wrote:
> >>
> >> > Posted a KIP for quotas in kafka.
> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
> >> >
> >> > Appreciate any feedback.
> >> >
> >> > Aditya
> >> >
> >>
>


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Gwen Shapira
We discussed an error code for rate-limiting (which I think made
sense), isn't it a similar case?

On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps  wrote:
> My concern is that as soon as you start encoding non-error response
> information into error codes the next question is what to do if two such
> codes apply (i.e. you have a replica down and the response is quota'd). I
> think I am trying to argue that error should mean "why we failed your
> request", for which there will really only be one reason, and any other
> useful information we want to send back is just another field in the
> response.
>
> -Jay
>
> On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira  wrote:
>
>> I think its not too late to reserve a set of error codes (200-299?)
>> for "non-error" codes.
>>
>> It won't be backward compatible (i.e. clients that currently do "else
>> throw" will throw on non-errors), but perhaps its worthwhile.
>>
>> On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps  wrote:
>> > Hey Jun,
>> >
>> > I'd really really really like to avoid that. Having just spent a bunch of
>> > time on the clients, using the error codes to encode other information
>> > about the response is super dangerous. The error handling is one of the
>> > hardest parts of the client (Guozhang chime in here).
>> >
>> > Generally the error handling looks like
>> >   if(error == none)
>> >  // good, process the request
>> >   else if(error == KNOWN_ERROR_1)
>> >  // handle known error 1
>> >   else if(error == KNOWN_ERROR_2)
>> >  // handle known error 2
>> >   else
>> >  throw Errors.forCode(error).exception(); // or some other default
>> > behavior
>> >
>> > This works because we have a convention that and error is something that
>> > prevented your getting the response so the default handling case is sane
>> > and forward compatible. It is tempting to use the error code to convey
>> > information in the success case. For example we could use error codes to
>> > encode whether quotas were enforced, whether the request was served out
>> of
>> > cache, whether the stock market is up today, or whatever. The problem is
>> > that since these are not errors as far as the client is concerned it
>> should
>> > not throw an exception but process the response, but now we created an
>> > explicit requirement that that error be handled explicitly since it is
>> > different. I really think that this kind of information is not an error,
>> it
>> > is just information, and if we want it in the response we should do the
>> > right thing and add a new field to the response.
>> >
>> > I think you saw the Samza bug that was literally an example of this
>> > happening and leading to an infinite retry loop.
>> >
>> > Further more I really want to emphasize that hitting your quota in the
>> > design that Adi has proposed is actually not an error condition at all.
>> It
>> > is totally reasonable in any bootstrap situation to intentionally want to
>> > run at the limit the system imposes on you.
>> >
>> > -Jay
>> >
>> >
>> >
>> > On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao  wrote:
>> >
>> >> It's probably useful for a client to know whether its requests are
>> >> throttled or not (e.g., for monitoring and alerting). From that
>> >> perspective, option B (delay the requests and return an error) seems
>> >> better.
>> >>
>> >> Thanks,
>> >>
>> >> Jun
>> >>
>> >> On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar <
>> >> aaurad...@linkedin.com.invalid> wrote:
>> >>
>> >> > Posted a KIP for quotas in kafka.
>> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
>> >> >
>> >> > Appreciate any feedback.
>> >> >
>> >> > Aditya
>> >> >
>> >>
>>


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Ewen Cheslack-Postava
Agreed that trying to shoehorn non-error codes into the error field is a
bad idea. It makes it *way* too easy to write code that looks (and should
be) correct but is actually incorrect. If necessary, I think it's much
better to to spend a couple of extra bytes to encode that information
separately (a "status" or "warning" section of the response). An indication
that throttling is occurring is something I'd expect to be indicated by a
bit flag in the response rather than as an error code.

Gwen - I think an error code makes sense when the request actually failed.
Option B, which Jun was advocating, would have appended the messages
successfully. If the rate-limiting case you're talking about had
successfully committed the messages, I would say that's also a bad use of
error codes.


On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira 
wrote:

> We discussed an error code for rate-limiting (which I think made
> sense), isn't it a similar case?
>
> On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps  wrote:
> > My concern is that as soon as you start encoding non-error response
> > information into error codes the next question is what to do if two such
> > codes apply (i.e. you have a replica down and the response is quota'd). I
> > think I am trying to argue that error should mean "why we failed your
> > request", for which there will really only be one reason, and any other
> > useful information we want to send back is just another field in the
> > response.
> >
> > -Jay
> >
> > On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira 
> wrote:
> >
> >> I think its not too late to reserve a set of error codes (200-299?)
> >> for "non-error" codes.
> >>
> >> It won't be backward compatible (i.e. clients that currently do "else
> >> throw" will throw on non-errors), but perhaps its worthwhile.
> >>
> >> On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps  wrote:
> >> > Hey Jun,
> >> >
> >> > I'd really really really like to avoid that. Having just spent a
> bunch of
> >> > time on the clients, using the error codes to encode other information
> >> > about the response is super dangerous. The error handling is one of
> the
> >> > hardest parts of the client (Guozhang chime in here).
> >> >
> >> > Generally the error handling looks like
> >> >   if(error == none)
> >> >  // good, process the request
> >> >   else if(error == KNOWN_ERROR_1)
> >> >  // handle known error 1
> >> >   else if(error == KNOWN_ERROR_2)
> >> >  // handle known error 2
> >> >   else
> >> >  throw Errors.forCode(error).exception(); // or some other default
> >> > behavior
> >> >
> >> > This works because we have a convention that and error is something
> that
> >> > prevented your getting the response so the default handling case is
> sane
> >> > and forward compatible. It is tempting to use the error code to convey
> >> > information in the success case. For example we could use error codes
> to
> >> > encode whether quotas were enforced, whether the request was served
> out
> >> of
> >> > cache, whether the stock market is up today, or whatever. The problem
> is
> >> > that since these are not errors as far as the client is concerned it
> >> should
> >> > not throw an exception but process the response, but now we created an
> >> > explicit requirement that that error be handled explicitly since it is
> >> > different. I really think that this kind of information is not an
> error,
> >> it
> >> > is just information, and if we want it in the response we should do
> the
> >> > right thing and add a new field to the response.
> >> >
> >> > I think you saw the Samza bug that was literally an example of this
> >> > happening and leading to an infinite retry loop.
> >> >
> >> > Further more I really want to emphasize that hitting your quota in the
> >> > design that Adi has proposed is actually not an error condition at
> all.
> >> It
> >> > is totally reasonable in any bootstrap situation to intentionally
> want to
> >> > run at the limit the system imposes on you.
> >> >
> >> > -Jay
> >> >
> >> >
> >> >
> >> > On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao  wrote:
> >> >
> >> >> It's probably useful for a client to know whether its requests are
> >> >> throttled or not (e.g., for monitoring and alerting). From that
> >> >> perspective, option B (delay the requests and return an error) seems
> >> >> better.
> >> >>
> >> >> Thanks,
> >> >>
> >> >> Jun
> >> >>
> >> >> On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar <
> >> >> aaurad...@linkedin.com.invalid> wrote:
> >> >>
> >> >> > Posted a KIP for quotas in kafka.
> >> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas
> >> >> >
> >> >> > Appreciate any feedback.
> >> >> >
> >> >> > Aditya
> >> >> >
> >> >>
> >>
>



-- 
Thanks,
Ewen


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-16 Thread Guozhang Wang
Yeah in this sense the sender thread will not exist immediately in the
close(0) call, but will only terminate after the current response batch has
been processed, as will the producer instance itself.

There is a reason for this though: for a clean shutdown the caller thread
has to wait for the sender thread to join before closing the producer
instance, but this cannot be achieve if close(0) is called by the sender
thread itself (for example in KAFKA-1659, there is a proposal from Andrew
Stein on using thread.interrupt and thread.stop, but if it is called by the
ioThread itself the stop call will fail). Hence we came up with the flag
approach to let the sender thread to close as soon as it is at the barrier
of the run loop.

Guozhang

On Mon, Mar 16, 2015 at 9:41 PM, Jun Rao  wrote:

> Hmm, does that mean that after close(0), the sender thread is not necessary
> gone? Normally, after closing an entity, we expect all internal threads
> associated with the entity are shut down completely.
>
> Thanks,
>
> Jun
>
> On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin 
> wrote:
>
> > Hi Jun,
> >
> > Close(0) will set two flags in sender. Running=false and a newly added
> > forceClose=true. It will also set accumulator.closed=true so no further
> > producer.send() will succeed.
> > The sender thread will finish executing all the callbacks in current
> batch
> > of responses, then it will see the forceClose flag. It will just fail all
> > the incomplete batches in the producer and exit.
> > So close(0) is a non-blocking call and sender thread will not try to join
> > itself in close(0).
> >
> > Thanks.
> >
> > Jiangjie (Becket) Qin
> >
> > On 3/16/15, 2:50 PM, "Jun Rao"  wrote:
> >
> > >How does close(0) work if it's called from the sender thread? If
> close(0)
> > >needs to wait for the sender thread to join, wouldn't this cause a
> > >deadlock?
> > >
> > >Thanks,
> > >
> > >Jun
> > >
> > >On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin  >
> > >wrote:
> > >
> > >> Thanks Guozhang. It wouldn’t be as thoroughly considered without
> > >> discussing with you :)
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On 3/16/15, 1:07 PM, "Guozhang Wang"  wrote:
> > >>
> > >> >Thanks Jiangjie,
> > >> >
> > >> >After talking to you offline on this, I have been convinced and
> > >>changed my
> > >> >preference to blocking. The immediate shutdown approach does have
> some
> > >> >unsafeness in some cases.
> > >> >
> > >> >Guozhang
> > >> >
> > >> >On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin
> > >> > >> >
> > >> >wrote:
> > >> >
> > >> >> It looks that the problem we want to solve and the purpose we want
> to
> > >> >> achieve is:
> > >> >> If user uses close() in callback, we want to let user be aware that
> > >>they
> > >> >> should use close(0) instead of close() in the callback.
> > >> >>
> > >> >> We have agreed that we will have an error log to inform user about
> > >>this
> > >> >> mis-usage. The options differ in the way how we can force user to
> > >>take a
> > >> >> look at that error log.
> > >> >> There are two scenarios:
> > >> >> 1. User does not expect the program to exit.
> > >> >> 2. User expect the program to exit.
> > >> >>
> > >> >> For scenario 1), blocking will probably delay the discovery of the
> > >> >> problem. Calling close(0) exposes the problem quicker. In this
> > >>scenario
> > >> >> producer just encounter a send failure when running normally.
> > >> >> For scenario 2), blocking will expose the problem quick. Calling
> > >> >>close(-1)
> > >> >> might hide the problem. This scenario might include: a) Unit test
> > >>for a
> > >> >> send failure. b) Message sending during a close() call from a user
> > >> >>thread.
> > >> >>
> > >> >> So as a summary table:
> > >> >>
> > >> >>   Scenario 1) Scenario 2)
> > >> >>
> > >> >> Blocking  Delay problem discovery Guaranteed problem
> > >> >>discovery
> > >> >>
> > >> >> Close(-1) Immediate problem discovery Problem might be
> hidden
> > >> >>
> > >> >>
> > >> >> Personally I prefer blocking because it seems providing more
> > >>guarantees
> > >> >> and safer.
> > >> >>
> > >> >> Thanks.
> > >> >>
> > >> >> Jiangjie (Becket) Qin
> > >> >>
> > >> >>
> > >> >> On 3/16/15, 10:11 AM, "Guozhang Wang"  wrote:
> > >> >>
> > >> >> >HI Jiangjie,
> > >> >> >
> > >> >> >As far as I understand calling close() in the ioThread is not
> > >>common,
> > >> >>as
> > >> >> >it
> > >> >> >may only trigger when we saw some non-retriable error. Hence when
> > >>user
> > >> >>run
> > >> >> >their program it is unlikely that close() will be triggered and
> > >>problem
> > >> >> >will be detected. So it seems to me that from the error detection
> > >> >>aspect
> > >> >> >these two options seems to be the same as people will usually
> > >>detect it
> > >> >> >from the producer metrics all dropping to 0.
> > >> >> >
> > >> >> >Guozhang
> > >> >> >
> > >> >> >On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin
> > >> >>
> > >> >> >wrote

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Gwen Shapira
You are right, shoe-horning status into an error field is a bad idea.
While many projects use a single "status" field to indicate different
error and non-error states, it doesn't seem like a good fit for the
current Kafka implementation.

Do you think that adding a "status" field to our protocol is feasible
at this point?



On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava
 wrote:
> Agreed that trying to shoehorn non-error codes into the error field is a
> bad idea. It makes it *way* too easy to write code that looks (and should
> be) correct but is actually incorrect. If necessary, I think it's much
> better to to spend a couple of extra bytes to encode that information
> separately (a "status" or "warning" section of the response). An indication
> that throttling is occurring is something I'd expect to be indicated by a
> bit flag in the response rather than as an error code.
>
> Gwen - I think an error code makes sense when the request actually failed.
> Option B, which Jun was advocating, would have appended the messages
> successfully. If the rate-limiting case you're talking about had
> successfully committed the messages, I would say that's also a bad use of
> error codes.
>
>
> On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira 
> wrote:
>
>> We discussed an error code for rate-limiting (which I think made
>> sense), isn't it a similar case?
>>
>> On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps  wrote:
>> > My concern is that as soon as you start encoding non-error response
>> > information into error codes the next question is what to do if two such
>> > codes apply (i.e. you have a replica down and the response is quota'd). I
>> > think I am trying to argue that error should mean "why we failed your
>> > request", for which there will really only be one reason, and any other
>> > useful information we want to send back is just another field in the
>> > response.
>> >
>> > -Jay
>> >
>> > On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira 
>> wrote:
>> >
>> >> I think its not too late to reserve a set of error codes (200-299?)
>> >> for "non-error" codes.
>> >>
>> >> It won't be backward compatible (i.e. clients that currently do "else
>> >> throw" will throw on non-errors), but perhaps its worthwhile.
>> >>
>> >> On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps  wrote:
>> >> > Hey Jun,
>> >> >
>> >> > I'd really really really like to avoid that. Having just spent a
>> bunch of
>> >> > time on the clients, using the error codes to encode other information
>> >> > about the response is super dangerous. The error handling is one of
>> the
>> >> > hardest parts of the client (Guozhang chime in here).
>> >> >
>> >> > Generally the error handling looks like
>> >> >   if(error == none)
>> >> >  // good, process the request
>> >> >   else if(error == KNOWN_ERROR_1)
>> >> >  // handle known error 1
>> >> >   else if(error == KNOWN_ERROR_2)
>> >> >  // handle known error 2
>> >> >   else
>> >> >  throw Errors.forCode(error).exception(); // or some other default
>> >> > behavior
>> >> >
>> >> > This works because we have a convention that and error is something
>> that
>> >> > prevented your getting the response so the default handling case is
>> sane
>> >> > and forward compatible. It is tempting to use the error code to convey
>> >> > information in the success case. For example we could use error codes
>> to
>> >> > encode whether quotas were enforced, whether the request was served
>> out
>> >> of
>> >> > cache, whether the stock market is up today, or whatever. The problem
>> is
>> >> > that since these are not errors as far as the client is concerned it
>> >> should
>> >> > not throw an exception but process the response, but now we created an
>> >> > explicit requirement that that error be handled explicitly since it is
>> >> > different. I really think that this kind of information is not an
>> error,
>> >> it
>> >> > is just information, and if we want it in the response we should do
>> the
>> >> > right thing and add a new field to the response.
>> >> >
>> >> > I think you saw the Samza bug that was literally an example of this
>> >> > happening and leading to an infinite retry loop.
>> >> >
>> >> > Further more I really want to emphasize that hitting your quota in the
>> >> > design that Adi has proposed is actually not an error condition at
>> all.
>> >> It
>> >> > is totally reasonable in any bootstrap situation to intentionally
>> want to
>> >> > run at the limit the system imposes on you.
>> >> >
>> >> > -Jay
>> >> >
>> >> >
>> >> >
>> >> > On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao  wrote:
>> >> >
>> >> >> It's probably useful for a client to know whether its requests are
>> >> >> throttled or not (e.g., for monitoring and alerting). From that
>> >> >> perspective, option B (delay the requests and return an error) seems
>> >> >> better.
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> Jun
>> >> >>
>> >> >> On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar <
>> >> >> aaurad...@linkedin.com.invalid>

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Guozhang Wang
I think we are really discussing two separate issues here:

1. Whether we should a) append-then-block-then-returnOKButThrottled or b)
block-then-returnFailDuetoThrottled for quota actions on produce requests.

Both these approaches assume some kind of well-behaveness of the clients:
option a) assumes the client sets an proper timeout value while can just
ignore "OKButThrottled" response, while option b) assumes the client
handles the "FailDuetoThrottled" appropriately. For any malicious clients
that, for example, just keep retrying either intentionally or not, neither
of these approaches are actually effective.

2. For "OKButThrottled" and "FailDuetoThrottled" responses, shall we encode
them as error codes or augment the protocol to use a separate field
indicating "status codes".

Today we have already incorporated some status code as error codes in the
responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this
is of course using a single field for response status like the HTTP status
codes, while the cons is that it requires clients to handle the error codes
carefully.

I think maybe we can actually extend the single-code approach to overcome
its drawbacks, that is, wrap the error codes semantics to the users so that
users do not need to handle the codes one-by-one. More concretely,
following Jay's example the client could write sth. like this:


-

  if(error.isOK())
 // status code is good or the code can be simply ignored for this
request type, process the request
  else if(error.needsRetry())
 // throttled, transient error, etc: retry
  else if(error.isFatal())
 // non-retriable errors, etc: notify / terminate / other handling

-

Only when the clients really want to handle, for example FailDuetoThrottled
status code specifically, it needs to:

  if(error.isOK())
 // status code is good or the code can be simply ignored for this
request type, process the request
  else if(error == FailDuetoThrottled )
 // throttled: log it
  else if(error.needsRetry())
 // transient error, etc: retry
  else if(error.isFatal())
 // non-retriable errors, etc: notify / terminate / other handling

-

And for implementation we can probably group the codes accordingly like
HTTP status code such that we can do:

boolean Error.isOK() {
  return code < 300 && code >= 200;
}

Guozhang

On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava 
wrote:

> Agreed that trying to shoehorn non-error codes into the error field is a
> bad idea. It makes it *way* too easy to write code that looks (and should
> be) correct but is actually incorrect. If necessary, I think it's much
> better to to spend a couple of extra bytes to encode that information
> separately (a "status" or "warning" section of the response). An indication
> that throttling is occurring is something I'd expect to be indicated by a
> bit flag in the response rather than as an error code.
>
> Gwen - I think an error code makes sense when the request actually failed.
> Option B, which Jun was advocating, would have appended the messages
> successfully. If the rate-limiting case you're talking about had
> successfully committed the messages, I would say that's also a bad use of
> error codes.
>
>
> On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira 
> wrote:
>
> > We discussed an error code for rate-limiting (which I think made
> > sense), isn't it a similar case?
> >
> > On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps  wrote:
> > > My concern is that as soon as you start encoding non-error response
> > > information into error codes the next question is what to do if two
> such
> > > codes apply (i.e. you have a replica down and the response is
> quota'd). I
> > > think I am trying to argue that error should mean "why we failed your
> > > request", for which there will really only be one reason, and any other
> > > useful information we want to send back is just another field in the
> > > response.
> > >
> > > -Jay
> > >
> > > On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira 
> > wrote:
> > >
> > >> I think its not too late to reserve a set of error codes (200-299?)
> > >> for "non-error" codes.
> > >>
> > >> It won't be backward compatible (i.e. clients that currently do "else
> > >> throw" will throw on non-errors), but perhaps its worthwhile.
> > >>
> > >> On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps 
> wrote:
> > >> > Hey Jun,
> > >> >
> > >> > I'd really really really like to avoid that. Having just spent a
> > bunch of
> > >> > time on the clients, using the error codes to encode other
> information
> > >> > about the response is super dangerous. The error handling is one of
> > the
> > >> > hardest parts of the client (Guozhang chime in here).
> > >> >
> > >> > Generally the error handling looks like
> > >> >   if(error == none)
> > >> >  // good, process the request
> > >> >   else if(error == KNOWN_ERROR_1)
> > >> >  // handle known error 1
> > >> >   else if(error == KNOWN_ERR

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Steven Wu
please correct me if I am missing sth here. I am not understanding how
would throttle work without cooperation/back-off from producer. new Java
producer supports non-blocking API. why would delayed response be able to
slow down producer? producer will continue to fire async sends.

On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang  wrote:

> I think we are really discussing two separate issues here:
>
> 1. Whether we should a) append-then-block-then-returnOKButThrottled or b)
> block-then-returnFailDuetoThrottled for quota actions on produce requests.
>
> Both these approaches assume some kind of well-behaveness of the clients:
> option a) assumes the client sets an proper timeout value while can just
> ignore "OKButThrottled" response, while option b) assumes the client
> handles the "FailDuetoThrottled" appropriately. For any malicious clients
> that, for example, just keep retrying either intentionally or not, neither
> of these approaches are actually effective.
>
> 2. For "OKButThrottled" and "FailDuetoThrottled" responses, shall we encode
> them as error codes or augment the protocol to use a separate field
> indicating "status codes".
>
> Today we have already incorporated some status code as error codes in the
> responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this
> is of course using a single field for response status like the HTTP status
> codes, while the cons is that it requires clients to handle the error codes
> carefully.
>
> I think maybe we can actually extend the single-code approach to overcome
> its drawbacks, that is, wrap the error codes semantics to the users so that
> users do not need to handle the codes one-by-one. More concretely,
> following Jay's example the client could write sth. like this:
>
>
> -
>
>   if(error.isOK())
>  // status code is good or the code can be simply ignored for this
> request type, process the request
>   else if(error.needsRetry())
>  // throttled, transient error, etc: retry
>   else if(error.isFatal())
>  // non-retriable errors, etc: notify / terminate / other handling
>
> -
>
> Only when the clients really want to handle, for example FailDuetoThrottled
> status code specifically, it needs to:
>
>   if(error.isOK())
>  // status code is good or the code can be simply ignored for this
> request type, process the request
>   else if(error == FailDuetoThrottled )
>  // throttled: log it
>   else if(error.needsRetry())
>  // transient error, etc: retry
>   else if(error.isFatal())
>  // non-retriable errors, etc: notify / terminate / other handling
>
> -
>
> And for implementation we can probably group the codes accordingly like
> HTTP status code such that we can do:
>
> boolean Error.isOK() {
>   return code < 300 && code >= 200;
> }
>
> Guozhang
>
> On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava  >
> wrote:
>
> > Agreed that trying to shoehorn non-error codes into the error field is a
> > bad idea. It makes it *way* too easy to write code that looks (and should
> > be) correct but is actually incorrect. If necessary, I think it's much
> > better to to spend a couple of extra bytes to encode that information
> > separately (a "status" or "warning" section of the response). An
> indication
> > that throttling is occurring is something I'd expect to be indicated by a
> > bit flag in the response rather than as an error code.
> >
> > Gwen - I think an error code makes sense when the request actually
> failed.
> > Option B, which Jun was advocating, would have appended the messages
> > successfully. If the rate-limiting case you're talking about had
> > successfully committed the messages, I would say that's also a bad use of
> > error codes.
> >
> >
> > On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira 
> > wrote:
> >
> > > We discussed an error code for rate-limiting (which I think made
> > > sense), isn't it a similar case?
> > >
> > > On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps 
> wrote:
> > > > My concern is that as soon as you start encoding non-error response
> > > > information into error codes the next question is what to do if two
> > such
> > > > codes apply (i.e. you have a replica down and the response is
> > quota'd). I
> > > > think I am trying to argue that error should mean "why we failed your
> > > > request", for which there will really only be one reason, and any
> other
> > > > useful information we want to send back is just another field in the
> > > > response.
> > > >
> > > > -Jay
> > > >
> > > > On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira  >
> > > wrote:
> > > >
> > > >> I think its not too late to reserve a set of error codes (200-299?)
> > > >> for "non-error" codes.
> > > >>
> > > >> It won't be backward compatible (i.e. clients that currently do
> "else
> > > >> throw" will throw on non-errors), but perhaps its worthwhile.
> > > >>
> > > >> On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps 
> > wrote:
> > > >> > Hey Jun,
> > > >> >

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Steven Wu
I think I can answer my own question. delayed response will cause the
producer buffer to be full, which then result in either thread blocking or
message drop.

On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu  wrote:

> please correct me if I am missing sth here. I am not understanding how
> would throttle work without cooperation/back-off from producer. new Java
> producer supports non-blocking API. why would delayed response be able to
> slow down producer? producer will continue to fire async sends.
>
> On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang 
> wrote:
>
>> I think we are really discussing two separate issues here:
>>
>> 1. Whether we should a) append-then-block-then-returnOKButThrottled or b)
>> block-then-returnFailDuetoThrottled for quota actions on produce requests.
>>
>> Both these approaches assume some kind of well-behaveness of the clients:
>> option a) assumes the client sets an proper timeout value while can just
>> ignore "OKButThrottled" response, while option b) assumes the client
>> handles the "FailDuetoThrottled" appropriately. For any malicious clients
>> that, for example, just keep retrying either intentionally or not, neither
>> of these approaches are actually effective.
>>
>> 2. For "OKButThrottled" and "FailDuetoThrottled" responses, shall we
>> encode
>> them as error codes or augment the protocol to use a separate field
>> indicating "status codes".
>>
>> Today we have already incorporated some status code as error codes in the
>> responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this
>> is of course using a single field for response status like the HTTP status
>> codes, while the cons is that it requires clients to handle the error
>> codes
>> carefully.
>>
>> I think maybe we can actually extend the single-code approach to overcome
>> its drawbacks, that is, wrap the error codes semantics to the users so
>> that
>> users do not need to handle the codes one-by-one. More concretely,
>> following Jay's example the client could write sth. like this:
>>
>>
>> -
>>
>>   if(error.isOK())
>>  // status code is good or the code can be simply ignored for this
>> request type, process the request
>>   else if(error.needsRetry())
>>  // throttled, transient error, etc: retry
>>   else if(error.isFatal())
>>  // non-retriable errors, etc: notify / terminate / other handling
>>
>> -
>>
>> Only when the clients really want to handle, for example
>> FailDuetoThrottled
>> status code specifically, it needs to:
>>
>>   if(error.isOK())
>>  // status code is good or the code can be simply ignored for this
>> request type, process the request
>>   else if(error == FailDuetoThrottled )
>>  // throttled: log it
>>   else if(error.needsRetry())
>>  // transient error, etc: retry
>>   else if(error.isFatal())
>>  // non-retriable errors, etc: notify / terminate / other handling
>>
>> -
>>
>> And for implementation we can probably group the codes accordingly like
>> HTTP status code such that we can do:
>>
>> boolean Error.isOK() {
>>   return code < 300 && code >= 200;
>> }
>>
>> Guozhang
>>
>> On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava <
>> e...@confluent.io>
>> wrote:
>>
>> > Agreed that trying to shoehorn non-error codes into the error field is a
>> > bad idea. It makes it *way* too easy to write code that looks (and
>> should
>> > be) correct but is actually incorrect. If necessary, I think it's much
>> > better to to spend a couple of extra bytes to encode that information
>> > separately (a "status" or "warning" section of the response). An
>> indication
>> > that throttling is occurring is something I'd expect to be indicated by
>> a
>> > bit flag in the response rather than as an error code.
>> >
>> > Gwen - I think an error code makes sense when the request actually
>> failed.
>> > Option B, which Jun was advocating, would have appended the messages
>> > successfully. If the rate-limiting case you're talking about had
>> > successfully committed the messages, I would say that's also a bad use
>> of
>> > error codes.
>> >
>> >
>> > On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira 
>> > wrote:
>> >
>> > > We discussed an error code for rate-limiting (which I think made
>> > > sense), isn't it a similar case?
>> > >
>> > > On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps 
>> wrote:
>> > > > My concern is that as soon as you start encoding non-error response
>> > > > information into error codes the next question is what to do if two
>> > such
>> > > > codes apply (i.e. you have a replica down and the response is
>> > quota'd). I
>> > > > think I am trying to argue that error should mean "why we failed
>> your
>> > > > request", for which there will really only be one reason, and any
>> other
>> > > > useful information we want to send back is just another field in the
>> > > > response.
>> > > >
>> > > > -Jay
>> > > >
>> > > > On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira <
>> gshap...@cloude

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-16 Thread Steven Wu
wait. we create one kafka producer for each cluster. each cluster can have
many topics. if producer buffer got filled up due to delayed response for
one throttled topic, won't that penalize other topics unfairly? it seems to
me that broker should just return error without delay.

sorry that I am chatting to myself :)

On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu  wrote:

> I think I can answer my own question. delayed response will cause the
> producer buffer to be full, which then result in either thread blocking or
> message drop.
>
> On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu  wrote:
>
>> please correct me if I am missing sth here. I am not understanding how
>> would throttle work without cooperation/back-off from producer. new Java
>> producer supports non-blocking API. why would delayed response be able to
>> slow down producer? producer will continue to fire async sends.
>>
>> On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang 
>> wrote:
>>
>>> I think we are really discussing two separate issues here:
>>>
>>> 1. Whether we should a) append-then-block-then-returnOKButThrottled or b)
>>> block-then-returnFailDuetoThrottled for quota actions on produce
>>> requests.
>>>
>>> Both these approaches assume some kind of well-behaveness of the clients:
>>> option a) assumes the client sets an proper timeout value while can just
>>> ignore "OKButThrottled" response, while option b) assumes the client
>>> handles the "FailDuetoThrottled" appropriately. For any malicious clients
>>> that, for example, just keep retrying either intentionally or not,
>>> neither
>>> of these approaches are actually effective.
>>>
>>> 2. For "OKButThrottled" and "FailDuetoThrottled" responses, shall we
>>> encode
>>> them as error codes or augment the protocol to use a separate field
>>> indicating "status codes".
>>>
>>> Today we have already incorporated some status code as error codes in the
>>> responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this
>>> is of course using a single field for response status like the HTTP
>>> status
>>> codes, while the cons is that it requires clients to handle the error
>>> codes
>>> carefully.
>>>
>>> I think maybe we can actually extend the single-code approach to overcome
>>> its drawbacks, that is, wrap the error codes semantics to the users so
>>> that
>>> users do not need to handle the codes one-by-one. More concretely,
>>> following Jay's example the client could write sth. like this:
>>>
>>>
>>> -
>>>
>>>   if(error.isOK())
>>>  // status code is good or the code can be simply ignored for this
>>> request type, process the request
>>>   else if(error.needsRetry())
>>>  // throttled, transient error, etc: retry
>>>   else if(error.isFatal())
>>>  // non-retriable errors, etc: notify / terminate / other handling
>>>
>>> -
>>>
>>> Only when the clients really want to handle, for example
>>> FailDuetoThrottled
>>> status code specifically, it needs to:
>>>
>>>   if(error.isOK())
>>>  // status code is good or the code can be simply ignored for this
>>> request type, process the request
>>>   else if(error == FailDuetoThrottled )
>>>  // throttled: log it
>>>   else if(error.needsRetry())
>>>  // transient error, etc: retry
>>>   else if(error.isFatal())
>>>  // non-retriable errors, etc: notify / terminate / other handling
>>>
>>> -
>>>
>>> And for implementation we can probably group the codes accordingly like
>>> HTTP status code such that we can do:
>>>
>>> boolean Error.isOK() {
>>>   return code < 300 && code >= 200;
>>> }
>>>
>>> Guozhang
>>>
>>> On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava <
>>> e...@confluent.io>
>>> wrote:
>>>
>>> > Agreed that trying to shoehorn non-error codes into the error field is
>>> a
>>> > bad idea. It makes it *way* too easy to write code that looks (and
>>> should
>>> > be) correct but is actually incorrect. If necessary, I think it's much
>>> > better to to spend a couple of extra bytes to encode that information
>>> > separately (a "status" or "warning" section of the response). An
>>> indication
>>> > that throttling is occurring is something I'd expect to be indicated
>>> by a
>>> > bit flag in the response rather than as an error code.
>>> >
>>> > Gwen - I think an error code makes sense when the request actually
>>> failed.
>>> > Option B, which Jun was advocating, would have appended the messages
>>> > successfully. If the rate-limiting case you're talking about had
>>> > successfully committed the messages, I would say that's also a bad use
>>> of
>>> > error codes.
>>> >
>>> >
>>> > On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira 
>>> > wrote:
>>> >
>>> > > We discussed an error code for rate-limiting (which I think made
>>> > > sense), isn't it a similar case?
>>> > >
>>> > > On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps 
>>> wrote:
>>> > > > My concern is that as soon as you start encoding non-error response
>>> > > > information into error code

[jira] [Updated] (KAFKA-1915) Integrate checkstyle for java code

2015-03-16 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-1915:
-
Resolution: Fixed
  Reviewer: Joel Koshy
Status: Resolved  (was: Patch Available)

Committed in 1c6d5bbac67.

> Integrate checkstyle for java code
> --
>
> Key: KAFKA-1915
> URL: https://issues.apache.org/jira/browse/KAFKA-1915
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Jay Kreps
>Priority: Minor
> Fix For: 0.8.3
>
> Attachments: KAFKA-1915.patch, KAFKA-1915_2015-02-03_09:17:34.patch
>
>
> There are a lot of little style and layering problems that tend to creep into 
> our code, especially with external patches and lax reviewers.
> These are the usual style suspects--capitalization, spacing, bracket 
> placement,  etc.
> My personal pet peave is a lack of clear thinking about layers. These 
> layering problems crept in quite fast, and sad to say a number of them were 
> accidentally caused by me. This is things like o.a.k.common depending on 
> o.a.k.clients or the consumer depending on the producer.
> I have a patch that integrates checkstyle to catch these issues at build 
> time, and which corrects the known problems. There are a fair number of very 
> small changes in this patch, all trivial.
> Checkstyle can be slightly annoying, not least of which because it has a 
> couple minor bugs around anonymous inner class formatting, but I find it is 
> 98% real style issues so mostly worth it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)