subscription

2012-12-03 Thread Prasanth J

Thanks
-- Prasanth



git mirror not updating

2012-12-03 Thread Jay Kreps
I don't think our git mirror is updating any more. Anyone know if there is
something we need to do to get this working again after the svn move?

(this may just be me being inept with git, though...)

-Jay


Re: git mirror not updating

2012-12-03 Thread Jun Rao
If it's not working, you can add a comment at this jira
https://issues.apache.org/jira/browse/INFRA-5587

We also have a separate jira to move from svn to git:
https://issues.apache.org/jira/browse/INFRA-5111

Thanks,

Jun

On Mon, Dec 3, 2012 at 9:10 AM, Jay Kreps  wrote:

> I don't think our git mirror is updating any more. Anyone know if there is
> something we need to do to get this working again after the svn move?
>
> (this may just be me being inept with git, though...)
>
> -Jay
>


[jira] [Commented] (KAFKA-598) decouple fetch size from max message size

2012-12-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508898#comment-13508898
 ] 

Jay Kreps commented on KAFKA-598:
-

I have some minor stylistic feedback, but first I think it would be good to 
discuss the model this implements and get consensus on that.

My understand of this patch is that it does the following:
1. multifetch all partitions using a per-partition fetch size based on the 
configuration the user provides in the consumer config (fetch.size)
2. check if there are any incomplete fetches and re-fetch these partitions 
using the consumer config (upper.fetch.size)

I think this may not be the best approach, but I am not sure. Here is my 
argument for why this configuration isn't really better than the current setup. 
Let's say you want to configure your consumer to be reliable and not crash, you 
need to align your jvm heap settings with your kafka memory usage. How much 
memory will this configuration use? Well in the worst case all partitions will 
come back incomplete so you need enough memory for upper.fetch.size * 
num_partitions. Actually since we have a queue of chunks, it is a multiple of 
this, but I think we need to fix that as a separate issue, so ignore that for 
now. Two conclusions from this: (1) the only parameter that matters is 
upper.fetch.size and if I have sufficient memory for that why not fetch more? 
(2) the memory I need depends on the number of partitions I am assigned, but 
this is out of my control (if a consumer dies it will increase) so it is almost 
impossible to set this right.

Here is an alternative. Have only one configuration: max.fetch.size which 
bounds the per-request memory allocation. Instead of using this for ever 
partition, instead use max.fetch.size/num_partitions so that increasing the 
number of partitions decreases the fetch size but does not increase memory 
usage. For incomplete fetches, follow up by doing a sequential fetch for each 
incomplete partition using the full max.fetch.size for just that partition. The 
reason I think this is better is that you get a hard bound on memory usage 
(which in practice you MUST have to run reliably) and this same bound also acts 
as the limit on the largest message you can handle. The two counter-arguments 
against this approach are (1) rather than crashing if you add partitions this 
approach will get slower (due to smaller fetches and eventually sequential 
fetches) you could argue that crashing is better than slow, (2) there could 
potentially be memory allocation downsides to doing large allocations in the 
common case (though there are definitely I/O benefits).

Let's figure this out and then I will do a more detailed review of the patch.

> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
> Attachments: KAFKA-598-v1.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-642) Protocol tweaks for 0.8

2012-12-03 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-642:


Priority: Blocker  (was: Major)

> Protocol tweaks for 0.8
> ---
>
> Key: KAFKA-642
> URL: https://issues.apache.org/jira/browse/KAFKA-642
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Priority: Blocker
> Attachments: KAFKA-642-v1.patch
>
>
> There are a couple of things in the protocol that are not idea. It would be 
> good to tweak these for 0.8 so we start clean.
> Here is a set of problems and proposals:
> Problems:
> 1. Correlation id is not used across all the requests. I don't think it can 
> work as intended because of this.
> 2. On reflection I am not sure that we need a correlation id field. I think 
> that since we need to guarantee that processing is sequential on any 
> particular socket we can correlate with a simple queue. (e.g. as the client 
> sends messages it adds them to a queue and as it receives responses it just 
> correlates to whatever is at the head of the queue).
> 3. The metadata response seems to have a number of problems. Among them is 
> that it weirdly repeats all the broker information many times. The response 
> includes the ISR, leader (maybe), and the replicas. Each of these repeat all 
> the broker information. This is super weird. I think what we should be doing 
> here is including all broker information for all brokers and then just having 
> the appropriate ids for the isr, leader, and replicas.
> 4. For topic discovery I think we need to support the case where no topics 
> are specified in the metadata request and for this return information about 
> all topics. I don't think we do this now.
> 5. I don't understand what the creator id is.
> 6. The offset request and response is not fully thought through and should be 
> generalized.
> Proposals:
> 1, 2. Correlation id. This is not strictly speaking needed, but it is maybe 
> useful for debugging to be able to trace a particular request from client to 
> server. So we will extend this across all the requests.
> 3. For metadata response I will try to fix this up by normalizing out the 
> broker list and having the isr, replicas, and leader field just have the node 
> id.
> 4. This should be uncontroversial and easy to add.
> 5. Let's remove creator id, it isn't used.
> 6. Let's generalize offset request. My proposal is below:
> Rename TopicMetadata API to ClusterMetadata, as this will contain all the 
> data that is known cluster-wide. Then let's generalize the offset request to 
> be PartitionMetadata--namely stuff about a particular partition on a 
> particular server.
> The format of PartitionMetdata would be the following:
> PartitionMetadataRequest => [TopicName [PartitionId MinSegmentTime 
> MaxSegmentInfos]]
>   TopicName => string
>   PartitionId => uint32
>   MinSegmentTime => uint64
>   MaxSegmentInfos => int32
> PartitionMetadataResponse => [TopicName [PartitionMetadata]]
>   TopicName => string
>   PartitionMetadata => PartitionId LogSize NumberOfSegments LogEndOffset 
> HighwaterMark [SegmentData]
>   SegmentData => StartOffset LastModifiedTime
>   LogSize => uint64
>   NumberOfSegments => int32
>   LogEndOffset => int64
>   HighwaterMark => int64
> This would be general enough that we could continue to add to it for any new 
> pieces of data we need.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] Subscription: outstanding kafka patches

2012-12-03 Thread jira
Issue Subscription
Filter: outstanding kafka patches (55 issues)
The list of outstanding kafka patches
Subscriber: kafka-mailing-list

Key Summary
KAFKA-642   Protocol tweaks for 0.8
https://issues.apache.org/jira/browse/KAFKA-642
KAFKA-637   Separate log4j environment variable from KAFKA_OPTS in 
kafka-run-class.sh
https://issues.apache.org/jira/browse/KAFKA-637
KAFKA-628   System Test Failure Case 5005 (Mirror Maker bouncing) - Data Loss 
in ConsoleConsumer
https://issues.apache.org/jira/browse/KAFKA-628
KAFKA-621   System Test 9051 : ConsoleConsumer doesn't receives any data for 20 
topics but works for 10
https://issues.apache.org/jira/browse/KAFKA-621
KAFKA-608   getTopicMetadata does not respect producer config settings
https://issues.apache.org/jira/browse/KAFKA-608
KAFKA-607   System Test Transient Failure (case 4011 Log Retention) - 
ConsoleConsumer receives less data
https://issues.apache.org/jira/browse/KAFKA-607
KAFKA-606   System Test Transient Failure (case 0302 GC Pause) - Log segments 
mismatched across replicas
https://issues.apache.org/jira/browse/KAFKA-606
KAFKA-604   Add missing metrics in 0.8
https://issues.apache.org/jira/browse/KAFKA-604
KAFKA-598   decouple fetch size from max message size
https://issues.apache.org/jira/browse/KAFKA-598
KAFKA-583   SimpleConsumerShell may receive less data inconsistently
https://issues.apache.org/jira/browse/KAFKA-583
KAFKA-552   No error messages logged for those failing-to-send messages from 
Producer
https://issues.apache.org/jira/browse/KAFKA-552
KAFKA-547   The ConsumerStats MBean name should include the groupid
https://issues.apache.org/jira/browse/KAFKA-547
KAFKA-530   kafka.server.KafkaApis: kafka.common.OffsetOutOfRangeException
https://issues.apache.org/jira/browse/KAFKA-530
KAFKA-514   Replication with Leader Failure Test: Log segment files checksum 
mismatch
https://issues.apache.org/jira/browse/KAFKA-514
KAFKA-493   High CPU usage on inactive server
https://issues.apache.org/jira/browse/KAFKA-493
KAFKA-479   ZK EPoll taking 100% CPU usage with Kafka Client
https://issues.apache.org/jira/browse/KAFKA-479
KAFKA-465   Performance test scripts - refactoring leftovers from tools to perf 
package
https://issues.apache.org/jira/browse/KAFKA-465
KAFKA-438   Code cleanup in MessageTest
https://issues.apache.org/jira/browse/KAFKA-438
KAFKA-419   Updated PHP client library to support kafka 0.7+
https://issues.apache.org/jira/browse/KAFKA-419
KAFKA-414   Evaluate mmap-based writes for Log implementation
https://issues.apache.org/jira/browse/KAFKA-414
KAFKA-411   Message Error in high cocurrent environment
https://issues.apache.org/jira/browse/KAFKA-411
KAFKA-408   ProducerPerformance does not work with all producer config options
https://issues.apache.org/jira/browse/KAFKA-408
KAFKA-404   When using chroot path, create chroot on startup if it doesn't exist
https://issues.apache.org/jira/browse/KAFKA-404
KAFKA-399   0.7.1 seems to show less performance than 0.7.0
https://issues.apache.org/jira/browse/KAFKA-399
KAFKA-398   Enhance SocketServer to Enable Sending Requests
https://issues.apache.org/jira/browse/KAFKA-398
KAFKA-397   kafka.common.InvalidMessageSizeException: null
https://issues.apache.org/jira/browse/KAFKA-397
KAFKA-388   Add a highly available consumer co-ordinator to a Kafka cluster
https://issues.apache.org/jira/browse/KAFKA-388
KAFKA-374   Move to java CRC32 implementation
https://issues.apache.org/jira/browse/KAFKA-374
KAFKA-346   Don't call commitOffsets() during rebalance
https://issues.apache.org/jira/browse/KAFKA-346
KAFKA-345   Add a listener to ZookeeperConsumerConnector to get notified on 
rebalance events
https://issues.apache.org/jira/browse/KAFKA-345
KAFKA-319   compression support added to php client does not pass unit tests
https://issues.apache.org/jira/browse/KAFKA-319
KAFKA-318   update zookeeper dependency to 3.3.5
https://issues.apache.org/jira/browse/KAFKA-318
KAFKA-314   Go Client Multi-produce
https://issues.apache.org/jira/browse/KAFKA-314
KAFKA-313   Add JSON output and looping options to ConsumerOffsetChecker
https://issues.apache.org/jira/browse/KAFKA-313
KAFKA-312   Add 'reset' operation for AsyncProducerDroppedEvents
https://issues.apache.org/jira/browse/KAFKA-312
KAFKA-298   Go Client support max message size
https://issues.apache.org/jira/browse/KAFKA-298
KAFKA-297   Go Client Publisher Improvments
https://issues.apache.org/jira/browse/KAFKA-297
KAFKA-296   Update Go Client to new version of Go
https://issues.apache.org/jira/browse/KAFKA-296
K

Re: maintainers

2012-12-03 Thread Jonathan Creasy
I signed up under command line tools since no one had. I'll work on getting
up to speed there.

Looking forward to working with you all!
On Dec 3, 2012 10:20 AM, "Jay Kreps"  wrote:

> Hey guys,
>
> Jun, Neha, and I have signed up for some areas. It would be great if other
> committers could give an area of interest where they would be willing to
> serve as point:
>  https://cwiki.apache.org/confluence/display/KAFKA/Maintainers
>
> -Jay
>


[jira] [Updated] (KAFKA-642) Protocol tweaks for 0.8

2012-12-03 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-642:


Attachment: KAFKA-642-v2.patch

I should clarify that my goal here is to make the most minimal change that 
fixes the client protocol for at least the user-facing apis. I also wussed out 
on trying to generalize offsetrequest/response. Basically I think trying those 
things at these point would just take and we are trying to stabalize and 
release.

So I made some of the changes you recommend, but some I think are bigger, and 
my hope was to hold off on those.

For example my goal is not to implement correlation id, just add it to the 
protocol. To  properly handle correlation id we need to make it so that we have 
a single counter across all requests on a single connection which is hard to do 
right now.  I have some thoughts on generalizing some of our serialization and 
request handling stuff which I started to discuss in KAFKA-643. All I want to 
do now is fix as much of the protocol as I can while breaking as little as 
possible in the process.

1. Agreed, fixed.

2. Ack, I missed the correlation id in OffsetResponse. I had intended to leave 
it out of the non-public apis since this was meant to be a minimal change, but 
it is easy to add so i will do so. This should simplify future upgrades.

3.1 Yeah, but see above comment.
3.2 I mean properly speaking having a default correlation id doesn't really 
make sense does it? Anything other than a per-connection counter is basically a 
bug...

4. No, it is a signed int so it should be fine for it to roll over every 4 
billion requests per connection, that will take a while.

5. Good point.

6. Done

7. See above comment on correlationId

8. Did it for DefaultEventHandler as that is easy, cowardly not attempting for 
consumer.

9.1 Done.
9.2  Deleted, should not duplicate protocol docs.
9.3 I chickened out on this. We will have to do it as a follow-up post 0.8 item.

10.1 Agreed, but this is several weeks of work I think. This is a pretty big 
refactoring. Some thoughts on KAFKA-643.

11. Yeah, I mean basically that constructor shouldn't exist at all since it 
isn't setting client id either. 

> Protocol tweaks for 0.8
> ---
>
> Key: KAFKA-642
> URL: https://issues.apache.org/jira/browse/KAFKA-642
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Priority: Blocker
> Attachments: KAFKA-642-v1.patch, KAFKA-642-v2.patch
>
>
> There are a couple of things in the protocol that are not idea. It would be 
> good to tweak these for 0.8 so we start clean.
> Here is a set of problems and proposals:
> Problems:
> 1. Correlation id is not used across all the requests. I don't think it can 
> work as intended because of this.
> 2. On reflection I am not sure that we need a correlation id field. I think 
> that since we need to guarantee that processing is sequential on any 
> particular socket we can correlate with a simple queue. (e.g. as the client 
> sends messages it adds them to a queue and as it receives responses it just 
> correlates to whatever is at the head of the queue).
> 3. The metadata response seems to have a number of problems. Among them is 
> that it weirdly repeats all the broker information many times. The response 
> includes the ISR, leader (maybe), and the replicas. Each of these repeat all 
> the broker information. This is super weird. I think what we should be doing 
> here is including all broker information for all brokers and then just having 
> the appropriate ids for the isr, leader, and replicas.
> 4. For topic discovery I think we need to support the case where no topics 
> are specified in the metadata request and for this return information about 
> all topics. I don't think we do this now.
> 5. I don't understand what the creator id is.
> 6. The offset request and response is not fully thought through and should be 
> generalized.
> Proposals:
> 1, 2. Correlation id. This is not strictly speaking needed, but it is maybe 
> useful for debugging to be able to trace a particular request from client to 
> server. So we will extend this across all the requests.
> 3. For metadata response I will try to fix this up by normalizing out the 
> broker list and having the isr, replicas, and leader field just have the node 
> id.
> 4. This should be uncontroversial and easy to add.
> 5. Let's remove creator id, it isn't used.
> 6. Let's generalize offset request. My proposal is below:
> Rename TopicMetadata API to ClusterMetadata, as this will contain all the 
> data that is known cluster-wide. Then let's generalize the offset request to 
> be PartitionMetadata--namely stuff about a particular partition on a 
> particular server.
> The format of PartitionMetdata would be the following:
> PartitionMetadataRequest => [TopicName [Partiti

[jira] [Commented] (KAFKA-642) Protocol tweaks for 0.8

2012-12-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508985#comment-13508985
 ] 

Jay Kreps commented on KAFKA-642:
-

Updated patch addresses issues as per points above. Still not rebased...await 
love from apache infra folks to revive our git.

> Protocol tweaks for 0.8
> ---
>
> Key: KAFKA-642
> URL: https://issues.apache.org/jira/browse/KAFKA-642
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Priority: Blocker
> Attachments: KAFKA-642-v1.patch, KAFKA-642-v2.patch
>
>
> There are a couple of things in the protocol that are not idea. It would be 
> good to tweak these for 0.8 so we start clean.
> Here is a set of problems and proposals:
> Problems:
> 1. Correlation id is not used across all the requests. I don't think it can 
> work as intended because of this.
> 2. On reflection I am not sure that we need a correlation id field. I think 
> that since we need to guarantee that processing is sequential on any 
> particular socket we can correlate with a simple queue. (e.g. as the client 
> sends messages it adds them to a queue and as it receives responses it just 
> correlates to whatever is at the head of the queue).
> 3. The metadata response seems to have a number of problems. Among them is 
> that it weirdly repeats all the broker information many times. The response 
> includes the ISR, leader (maybe), and the replicas. Each of these repeat all 
> the broker information. This is super weird. I think what we should be doing 
> here is including all broker information for all brokers and then just having 
> the appropriate ids for the isr, leader, and replicas.
> 4. For topic discovery I think we need to support the case where no topics 
> are specified in the metadata request and for this return information about 
> all topics. I don't think we do this now.
> 5. I don't understand what the creator id is.
> 6. The offset request and response is not fully thought through and should be 
> generalized.
> Proposals:
> 1, 2. Correlation id. This is not strictly speaking needed, but it is maybe 
> useful for debugging to be able to trace a particular request from client to 
> server. So we will extend this across all the requests.
> 3. For metadata response I will try to fix this up by normalizing out the 
> broker list and having the isr, replicas, and leader field just have the node 
> id.
> 4. This should be uncontroversial and easy to add.
> 5. Let's remove creator id, it isn't used.
> 6. Let's generalize offset request. My proposal is below:
> Rename TopicMetadata API to ClusterMetadata, as this will contain all the 
> data that is known cluster-wide. Then let's generalize the offset request to 
> be PartitionMetadata--namely stuff about a particular partition on a 
> particular server.
> The format of PartitionMetdata would be the following:
> PartitionMetadataRequest => [TopicName [PartitionId MinSegmentTime 
> MaxSegmentInfos]]
>   TopicName => string
>   PartitionId => uint32
>   MinSegmentTime => uint64
>   MaxSegmentInfos => int32
> PartitionMetadataResponse => [TopicName [PartitionMetadata]]
>   TopicName => string
>   PartitionMetadata => PartitionId LogSize NumberOfSegments LogEndOffset 
> HighwaterMark [SegmentData]
>   SegmentData => StartOffset LastModifiedTime
>   LogSize => uint64
>   NumberOfSegments => int32
>   LogEndOffset => int64
>   HighwaterMark => int64
> This would be general enough that we could continue to add to it for any new 
> pieces of data we need.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-642) Protocol tweaks for 0.8

2012-12-03 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-642:


Attachment: KAFKA-642-v3.patch

Missed two files on that last patch...

> Protocol tweaks for 0.8
> ---
>
> Key: KAFKA-642
> URL: https://issues.apache.org/jira/browse/KAFKA-642
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Priority: Blocker
> Attachments: KAFKA-642-v1.patch, KAFKA-642-v2.patch, 
> KAFKA-642-v3.patch
>
>
> There are a couple of things in the protocol that are not idea. It would be 
> good to tweak these for 0.8 so we start clean.
> Here is a set of problems and proposals:
> Problems:
> 1. Correlation id is not used across all the requests. I don't think it can 
> work as intended because of this.
> 2. On reflection I am not sure that we need a correlation id field. I think 
> that since we need to guarantee that processing is sequential on any 
> particular socket we can correlate with a simple queue. (e.g. as the client 
> sends messages it adds them to a queue and as it receives responses it just 
> correlates to whatever is at the head of the queue).
> 3. The metadata response seems to have a number of problems. Among them is 
> that it weirdly repeats all the broker information many times. The response 
> includes the ISR, leader (maybe), and the replicas. Each of these repeat all 
> the broker information. This is super weird. I think what we should be doing 
> here is including all broker information for all brokers and then just having 
> the appropriate ids for the isr, leader, and replicas.
> 4. For topic discovery I think we need to support the case where no topics 
> are specified in the metadata request and for this return information about 
> all topics. I don't think we do this now.
> 5. I don't understand what the creator id is.
> 6. The offset request and response is not fully thought through and should be 
> generalized.
> Proposals:
> 1, 2. Correlation id. This is not strictly speaking needed, but it is maybe 
> useful for debugging to be able to trace a particular request from client to 
> server. So we will extend this across all the requests.
> 3. For metadata response I will try to fix this up by normalizing out the 
> broker list and having the isr, replicas, and leader field just have the node 
> id.
> 4. This should be uncontroversial and easy to add.
> 5. Let's remove creator id, it isn't used.
> 6. Let's generalize offset request. My proposal is below:
> Rename TopicMetadata API to ClusterMetadata, as this will contain all the 
> data that is known cluster-wide. Then let's generalize the offset request to 
> be PartitionMetadata--namely stuff about a particular partition on a 
> particular server.
> The format of PartitionMetdata would be the following:
> PartitionMetadataRequest => [TopicName [PartitionId MinSegmentTime 
> MaxSegmentInfos]]
>   TopicName => string
>   PartitionId => uint32
>   MinSegmentTime => uint64
>   MaxSegmentInfos => int32
> PartitionMetadataResponse => [TopicName [PartitionMetadata]]
>   TopicName => string
>   PartitionMetadata => PartitionId LogSize NumberOfSegments LogEndOffset 
> HighwaterMark [SegmentData]
>   SegmentData => StartOffset LastModifiedTime
>   LogSize => uint64
>   NumberOfSegments => int32
>   LogEndOffset => int64
>   HighwaterMark => int64
> This would be general enough that we could continue to add to it for any new 
> pieces of data we need.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-598) decouple fetch size from max message size

2012-12-03 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13509060#comment-13509060
 ] 

Jun Rao commented on KAFKA-598:
---

That's an interesting alternative. The main drawback that I see is the first 
problem that you raised. Any consumer that subscribes to a wildcard (e.g., 
mirror maker) could be consuming a growing # of topics over time. This means 
that one has to know the number of topic/partitions in order to set 
max.fetch.size properly and one has to keep tweaking it as the number of 
topic/partitions changes. 

> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
> Attachments: KAFKA-598-v1.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-598) decouple fetch size from max message size

2012-12-03 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13509064#comment-13509064
 ] 

Joel Koshy commented on KAFKA-598:
--

Thanks for bringing this up. I had considered a slight variant of your
suggestion (although I was thinking of issuing requests with upper fetch
size for incomplete partitions one partition at a time, instead of another
multi-fetch on all incomplete partitions as done in this patch). I didn't go
with it mainly due to the concern (1) that you raise - i.e., to avoid doing
sequential fetches, although I don't think that is too much of an issue.

Whether having an upper fetch size config is better than the current set up
depends on the use case: e.g., if there are only a few partitions that have
large messages, then the upper fetch size approach would work well. In the
worst case of all partitions being incomplete that would lead to a large
allocation - which is also why I felt the "pipelined fetches of incomplete
partitions" approach added no real value (since it is equivalent in terms of
the net memory the client is expected to handle).

Anyway I think the above then leads naturally to your suggestion of using a
single fetch size and dividing that across all partitions. I like that
approach - especially since there are no new configs to deal with. I would
think the memory allocation concerns are valid but tolerable from the
client's perspective - i.e., the heavy memory allocations only kick in when
there are incomplete partitions in which case I think most clients would
want to consume anyway (along with a log warning indicating a large
message).  One minor drawback is that there isn't really a clear default
value for fetch size - right now, it is reasonable to say with a fetch size
of 1MB that is also the (approximate) max size of a message. With the above
re-design we can no longer map the config to messages since there is no
prior knowledge of number of partitions consumed by each consumer thread,
but I don't think that's a big deal.

So as I see it the, choices are:

1) Change the semantics of fetch size to be net, across all partitions.
   If/when incomplete partitions are encountered, For each incomplete
   partition, issue a fetch request of size fetch.size. I think if we do
   this we should also include the total memory that would be used - i.e.,
   including the queued chunks.
   
2) Introduce a new config called upper fetch size that kicks in whenever
   there are incomplete partitions - for which:
   a) issue a multi-fetch request with size upper fetch size for all
  incomplete partitions.
   OR
   b) issue sequential fetch requests of upper fetch size, one incomplete
  partition at a time.

3) If we had byte-addressability for fetches (which I really think we should
   allow at least for these kinds of internal APIs) we could consider a
   third option: keep fetch size as is, and issue pipelined fetch requests
   to build up and complete incomplete partition, one at a time.

What do people think?


> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
> Attachments: KAFKA-598-v1.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-598) decouple fetch size from max message size

2012-12-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13509318#comment-13509318
 ] 

Jay Kreps commented on KAFKA-598:
-

So I guess the one hard requirement is that we have to be able to tell people 
how much memory our client will use. People have to set a heap size, and if we 
can't tell them how much memory we will use without crashing their app they 
will be unhappy.

Let's consider a hard case: 5 consumer processes, 100 topics, with 5 partitions 
each, and queue size of 5. With fetch size of 1MB and upper fetch size of 50MB 
what is a safe heap size for this person to configure and be assured we won't 
crash their app?

This is why I don't really see pipelined fetches helping. They have to stich it 
together into a ByteBuffer in the end, so fetching that in pieces doesn't 
really help. Supporting non-memory resident messages is possible but would be a 
massive re-architecture of almost everything.

Another option I don't think you covered would be to change the fetch request 
so that it takes a single size rather than one per partition. That would solve 
one dilemma we currently have--manu topics could have no new data but we have 
to budget space for them as if they would (since they might), doing this on the 
server side we could be a little bit smarter. However we would need to ensure 
that one partition that has infinite data to read can't starve out other 
partitions.

> decouple fetch size from max message size
> -
>
> Key: KAFKA-598
> URL: https://issues.apache.org/jira/browse/KAFKA-598
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Jun Rao
>Assignee: Joel Koshy
> Attachments: KAFKA-598-v1.patch
>
>
> Currently, a consumer has to set fetch size larger than the max message size. 
> This increases the memory footprint on the consumer, especially when a large 
> number of topic/partition is subscribed. By decoupling the fetch size from 
> max message size, we can use a smaller fetch size for normal consumption and 
> when hitting a large message (hopefully rare), we automatically increase 
> fetch size to max message size temporarily.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-608) getTopicMetadata does not respect producer config settings

2012-12-03 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede closed KAFKA-608.
---


> getTopicMetadata does not respect producer config settings
> --
>
> Key: KAFKA-608
> URL: https://issues.apache.org/jira/browse/KAFKA-608
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Neha Narkhede
>Priority: Blocker
> Fix For: 0.8
>
> Attachments: kafka-608-v1.patch, kafka-608-v2.patch
>
>
> ProducerPool.scala contains the following code:
> object ProducerPool{
>   def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): 
> SyncProducer = {
> val props = new Properties()
> props.put("host", broker.host)
> props.put("port", broker.port.toString)
> if(configOpt.isDefined)
>   props.putAll(configOpt.get.props.props)
> new SyncProducer(new SyncProducerConfig(props))
>   }
> }
> Note also, that ClientUtils.getTopicMetadata() does the following:
>ProducerPool.createSyncProducer(None, brokers(i))
> As a result there is no way to control the socket settings for the get 
> metadata request.
> My recommendation is that we require the config to be specified in the 
> Note that this creates a new sync producer without using ANY of the settings 
> the user had given for the producer. In particular the socket timeout is 
> defaulted to 500ms. 
> This causes unit tests to fail a lot since a newly started test may easily 
> timeout on a 500ms request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-608) getTopicMetadata does not respect producer config settings

2012-12-03 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-608:


Resolution: Fixed
Status: Resolved  (was: Patch Available)

> getTopicMetadata does not respect producer config settings
> --
>
> Key: KAFKA-608
> URL: https://issues.apache.org/jira/browse/KAFKA-608
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8
>Reporter: Jay Kreps
>Assignee: Neha Narkhede
>Priority: Blocker
> Fix For: 0.8
>
> Attachments: kafka-608-v1.patch, kafka-608-v2.patch
>
>
> ProducerPool.scala contains the following code:
> object ProducerPool{
>   def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): 
> SyncProducer = {
> val props = new Properties()
> props.put("host", broker.host)
> props.put("port", broker.port.toString)
> if(configOpt.isDefined)
>   props.putAll(configOpt.get.props.props)
> new SyncProducer(new SyncProducerConfig(props))
>   }
> }
> Note also, that ClientUtils.getTopicMetadata() does the following:
>ProducerPool.createSyncProducer(None, brokers(i))
> As a result there is no way to control the socket settings for the get 
> metadata request.
> My recommendation is that we require the config to be specified in the 
> Note that this creates a new sync producer without using ANY of the settings 
> the user had given for the producer. In particular the socket timeout is 
> defaulted to 500ms. 
> This causes unit tests to fail a lot since a newly started test may easily 
> timeout on a 500ms request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-622) Create mbeans per client

2012-12-03 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13509335#comment-13509335
 ] 

Jun Rao commented on KAFKA-622:
---

Sorry for the late review. Have a few minor questions/comments.

60. What happens if have 2 instances of Consumers with the same clientid in the 
same jvm? Does one of them fail because it fails to register metrics? Ditto for 
Producers.

61. ConsumerTopicStats: What if a topic is named AllTopics? We use to handle 
this by adding a - in topic specific stats.

62. ZookeeperConsumerConnector: Do we need to validate groupid?

63. ClientId: Does the clientid length need to be different from topic length?

64. AbstractFetcherThread: When building a fetch request, do we need to pass in 
brokerInfo as part of the client id? BrokerInfo contains the source broker info 
and the fetch requests are always made to the source broker.



> Create mbeans per client 
> -
>
> Key: KAFKA-622
> URL: https://issues.apache.org/jira/browse/KAFKA-622
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8
>Reporter: Swapnil Ghike
>Assignee: Swapnil Ghike
>Priority: Blocker
>  Labels: bugs, improvement
> Fix For: 0.8
>
> Attachments: kafka-622-v1.patch, kafka-622-v2.patch, 
> kafka-622-v3.patch, kafka-622-v4.patch, kafka-622-v5.patch, kafka-622-v6.patch
>
>
> Currently we create one mbean of each type for a given mbean server, 
> regardless of the number of clients. We should create MBeans per client for 
> both producer and consumer. To do that we need to introduce clientId in mbean 
> names.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


How to unsubscribe from mailing list?

2012-12-03 Thread Marv Lush
I've tried sending an email to:


kafka-dev-unsubscr...@incubator.apache.org

kafka-dev-unsubscr...@kafka.apache.org

No luck.

any ideas?

thanks,

/m


Re: How to unsubscribe from mailing list?

2012-12-03 Thread Jun Rao
Try dev-unsubscr...@kafka.apache.org

Thanks,

Jun

On Mon, Dec 3, 2012 at 8:03 PM, Marv Lush  wrote:

> I've tried sending an email to:
>
>
> kafka-dev-unsubscr...@incubator.apache.org
>
> kafka-dev-unsubscr...@kafka.apache.org
>
> No luck.
>
> any ideas?
>
> thanks,
>
> /m
>


[jira] [Updated] (KAFKA-597) Refactor KafkaScheduler

2012-12-03 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-597:


Attachment: KAFKA-597-v1.patch

This patch refactors the scheduler as described above except that I didn't 
implement task cancelation.

It also converts LogManagerTest to use the new MockScheduler.

> Refactor KafkaScheduler
> ---
>
> Key: KAFKA-597
> URL: https://issues.apache.org/jira/browse/KAFKA-597
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Jay Kreps
>Priority: Minor
> Attachments: KAFKA-597-v1.patch
>
>
> It would be nice to cleanup KafkaScheduler. Here is what I am thinking
> Extract the following interface:
> trait Scheduler {
>   def startup()
>   def schedule(fun: () => Unit, name: String, delayMs: Long = 0, periodMs: 
> Long): Scheduled
>   def shutdown(interrupt: Boolean = false)
> }
> class Scheduled {
>   def lastExecution: Long
>   def cancel()
> }
> We would have two implementations, KafkaScheduler and  MockScheduler. 
> KafkaScheduler would be a wrapper for ScheduledThreadPoolExecutor. 
> MockScheduler would only allow manual time advancement rather than using the 
> system clock, we would switch unit tests over to this.
> This change would be different from the existing scheduler in a the following 
> ways:
> 1. Would not return a ScheduledFuture (since this is useless)
> 2. shutdown() would be a blocking call. The current shutdown calls, don't 
> really do what people want.
> 3. We would remove the daemon thread flag, as I don't think it works.
> 4. It returns an object which let's you cancel the job or get the last 
> execution time.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira