subscription
Thanks -- Prasanth
git mirror not updating
I don't think our git mirror is updating any more. Anyone know if there is something we need to do to get this working again after the svn move? (this may just be me being inept with git, though...) -Jay
Re: git mirror not updating
If it's not working, you can add a comment at this jira https://issues.apache.org/jira/browse/INFRA-5587 We also have a separate jira to move from svn to git: https://issues.apache.org/jira/browse/INFRA-5111 Thanks, Jun On Mon, Dec 3, 2012 at 9:10 AM, Jay Kreps wrote: > I don't think our git mirror is updating any more. Anyone know if there is > something we need to do to get this working again after the svn move? > > (this may just be me being inept with git, though...) > > -Jay >
[jira] [Commented] (KAFKA-598) decouple fetch size from max message size
[ https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508898#comment-13508898 ] Jay Kreps commented on KAFKA-598: - I have some minor stylistic feedback, but first I think it would be good to discuss the model this implements and get consensus on that. My understand of this patch is that it does the following: 1. multifetch all partitions using a per-partition fetch size based on the configuration the user provides in the consumer config (fetch.size) 2. check if there are any incomplete fetches and re-fetch these partitions using the consumer config (upper.fetch.size) I think this may not be the best approach, but I am not sure. Here is my argument for why this configuration isn't really better than the current setup. Let's say you want to configure your consumer to be reliable and not crash, you need to align your jvm heap settings with your kafka memory usage. How much memory will this configuration use? Well in the worst case all partitions will come back incomplete so you need enough memory for upper.fetch.size * num_partitions. Actually since we have a queue of chunks, it is a multiple of this, but I think we need to fix that as a separate issue, so ignore that for now. Two conclusions from this: (1) the only parameter that matters is upper.fetch.size and if I have sufficient memory for that why not fetch more? (2) the memory I need depends on the number of partitions I am assigned, but this is out of my control (if a consumer dies it will increase) so it is almost impossible to set this right. Here is an alternative. Have only one configuration: max.fetch.size which bounds the per-request memory allocation. Instead of using this for ever partition, instead use max.fetch.size/num_partitions so that increasing the number of partitions decreases the fetch size but does not increase memory usage. For incomplete fetches, follow up by doing a sequential fetch for each incomplete partition using the full max.fetch.size for just that partition. The reason I think this is better is that you get a hard bound on memory usage (which in practice you MUST have to run reliably) and this same bound also acts as the limit on the largest message you can handle. The two counter-arguments against this approach are (1) rather than crashing if you add partitions this approach will get slower (due to smaller fetches and eventually sequential fetches) you could argue that crashing is better than slow, (2) there could potentially be memory allocation downsides to doing large allocations in the common case (though there are definitely I/O benefits). Let's figure this out and then I will do a more detailed review of the patch. > decouple fetch size from max message size > - > > Key: KAFKA-598 > URL: https://issues.apache.org/jira/browse/KAFKA-598 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Joel Koshy > Attachments: KAFKA-598-v1.patch > > > Currently, a consumer has to set fetch size larger than the max message size. > This increases the memory footprint on the consumer, especially when a large > number of topic/partition is subscribed. By decoupling the fetch size from > max message size, we can use a smaller fetch size for normal consumption and > when hitting a large message (hopefully rare), we automatically increase > fetch size to max message size temporarily. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-642) Protocol tweaks for 0.8
[ https://issues.apache.org/jira/browse/KAFKA-642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-642: Priority: Blocker (was: Major) > Protocol tweaks for 0.8 > --- > > Key: KAFKA-642 > URL: https://issues.apache.org/jira/browse/KAFKA-642 > Project: Kafka > Issue Type: Bug >Reporter: Jay Kreps >Priority: Blocker > Attachments: KAFKA-642-v1.patch > > > There are a couple of things in the protocol that are not idea. It would be > good to tweak these for 0.8 so we start clean. > Here is a set of problems and proposals: > Problems: > 1. Correlation id is not used across all the requests. I don't think it can > work as intended because of this. > 2. On reflection I am not sure that we need a correlation id field. I think > that since we need to guarantee that processing is sequential on any > particular socket we can correlate with a simple queue. (e.g. as the client > sends messages it adds them to a queue and as it receives responses it just > correlates to whatever is at the head of the queue). > 3. The metadata response seems to have a number of problems. Among them is > that it weirdly repeats all the broker information many times. The response > includes the ISR, leader (maybe), and the replicas. Each of these repeat all > the broker information. This is super weird. I think what we should be doing > here is including all broker information for all brokers and then just having > the appropriate ids for the isr, leader, and replicas. > 4. For topic discovery I think we need to support the case where no topics > are specified in the metadata request and for this return information about > all topics. I don't think we do this now. > 5. I don't understand what the creator id is. > 6. The offset request and response is not fully thought through and should be > generalized. > Proposals: > 1, 2. Correlation id. This is not strictly speaking needed, but it is maybe > useful for debugging to be able to trace a particular request from client to > server. So we will extend this across all the requests. > 3. For metadata response I will try to fix this up by normalizing out the > broker list and having the isr, replicas, and leader field just have the node > id. > 4. This should be uncontroversial and easy to add. > 5. Let's remove creator id, it isn't used. > 6. Let's generalize offset request. My proposal is below: > Rename TopicMetadata API to ClusterMetadata, as this will contain all the > data that is known cluster-wide. Then let's generalize the offset request to > be PartitionMetadata--namely stuff about a particular partition on a > particular server. > The format of PartitionMetdata would be the following: > PartitionMetadataRequest => [TopicName [PartitionId MinSegmentTime > MaxSegmentInfos]] > TopicName => string > PartitionId => uint32 > MinSegmentTime => uint64 > MaxSegmentInfos => int32 > PartitionMetadataResponse => [TopicName [PartitionMetadata]] > TopicName => string > PartitionMetadata => PartitionId LogSize NumberOfSegments LogEndOffset > HighwaterMark [SegmentData] > SegmentData => StartOffset LastModifiedTime > LogSize => uint64 > NumberOfSegments => int32 > LogEndOffset => int64 > HighwaterMark => int64 > This would be general enough that we could continue to add to it for any new > pieces of data we need. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] Subscription: outstanding kafka patches
Issue Subscription Filter: outstanding kafka patches (55 issues) The list of outstanding kafka patches Subscriber: kafka-mailing-list Key Summary KAFKA-642 Protocol tweaks for 0.8 https://issues.apache.org/jira/browse/KAFKA-642 KAFKA-637 Separate log4j environment variable from KAFKA_OPTS in kafka-run-class.sh https://issues.apache.org/jira/browse/KAFKA-637 KAFKA-628 System Test Failure Case 5005 (Mirror Maker bouncing) - Data Loss in ConsoleConsumer https://issues.apache.org/jira/browse/KAFKA-628 KAFKA-621 System Test 9051 : ConsoleConsumer doesn't receives any data for 20 topics but works for 10 https://issues.apache.org/jira/browse/KAFKA-621 KAFKA-608 getTopicMetadata does not respect producer config settings https://issues.apache.org/jira/browse/KAFKA-608 KAFKA-607 System Test Transient Failure (case 4011 Log Retention) - ConsoleConsumer receives less data https://issues.apache.org/jira/browse/KAFKA-607 KAFKA-606 System Test Transient Failure (case 0302 GC Pause) - Log segments mismatched across replicas https://issues.apache.org/jira/browse/KAFKA-606 KAFKA-604 Add missing metrics in 0.8 https://issues.apache.org/jira/browse/KAFKA-604 KAFKA-598 decouple fetch size from max message size https://issues.apache.org/jira/browse/KAFKA-598 KAFKA-583 SimpleConsumerShell may receive less data inconsistently https://issues.apache.org/jira/browse/KAFKA-583 KAFKA-552 No error messages logged for those failing-to-send messages from Producer https://issues.apache.org/jira/browse/KAFKA-552 KAFKA-547 The ConsumerStats MBean name should include the groupid https://issues.apache.org/jira/browse/KAFKA-547 KAFKA-530 kafka.server.KafkaApis: kafka.common.OffsetOutOfRangeException https://issues.apache.org/jira/browse/KAFKA-530 KAFKA-514 Replication with Leader Failure Test: Log segment files checksum mismatch https://issues.apache.org/jira/browse/KAFKA-514 KAFKA-493 High CPU usage on inactive server https://issues.apache.org/jira/browse/KAFKA-493 KAFKA-479 ZK EPoll taking 100% CPU usage with Kafka Client https://issues.apache.org/jira/browse/KAFKA-479 KAFKA-465 Performance test scripts - refactoring leftovers from tools to perf package https://issues.apache.org/jira/browse/KAFKA-465 KAFKA-438 Code cleanup in MessageTest https://issues.apache.org/jira/browse/KAFKA-438 KAFKA-419 Updated PHP client library to support kafka 0.7+ https://issues.apache.org/jira/browse/KAFKA-419 KAFKA-414 Evaluate mmap-based writes for Log implementation https://issues.apache.org/jira/browse/KAFKA-414 KAFKA-411 Message Error in high cocurrent environment https://issues.apache.org/jira/browse/KAFKA-411 KAFKA-408 ProducerPerformance does not work with all producer config options https://issues.apache.org/jira/browse/KAFKA-408 KAFKA-404 When using chroot path, create chroot on startup if it doesn't exist https://issues.apache.org/jira/browse/KAFKA-404 KAFKA-399 0.7.1 seems to show less performance than 0.7.0 https://issues.apache.org/jira/browse/KAFKA-399 KAFKA-398 Enhance SocketServer to Enable Sending Requests https://issues.apache.org/jira/browse/KAFKA-398 KAFKA-397 kafka.common.InvalidMessageSizeException: null https://issues.apache.org/jira/browse/KAFKA-397 KAFKA-388 Add a highly available consumer co-ordinator to a Kafka cluster https://issues.apache.org/jira/browse/KAFKA-388 KAFKA-374 Move to java CRC32 implementation https://issues.apache.org/jira/browse/KAFKA-374 KAFKA-346 Don't call commitOffsets() during rebalance https://issues.apache.org/jira/browse/KAFKA-346 KAFKA-345 Add a listener to ZookeeperConsumerConnector to get notified on rebalance events https://issues.apache.org/jira/browse/KAFKA-345 KAFKA-319 compression support added to php client does not pass unit tests https://issues.apache.org/jira/browse/KAFKA-319 KAFKA-318 update zookeeper dependency to 3.3.5 https://issues.apache.org/jira/browse/KAFKA-318 KAFKA-314 Go Client Multi-produce https://issues.apache.org/jira/browse/KAFKA-314 KAFKA-313 Add JSON output and looping options to ConsumerOffsetChecker https://issues.apache.org/jira/browse/KAFKA-313 KAFKA-312 Add 'reset' operation for AsyncProducerDroppedEvents https://issues.apache.org/jira/browse/KAFKA-312 KAFKA-298 Go Client support max message size https://issues.apache.org/jira/browse/KAFKA-298 KAFKA-297 Go Client Publisher Improvments https://issues.apache.org/jira/browse/KAFKA-297 KAFKA-296 Update Go Client to new version of Go https://issues.apache.org/jira/browse/KAFKA-296 K
Re: maintainers
I signed up under command line tools since no one had. I'll work on getting up to speed there. Looking forward to working with you all! On Dec 3, 2012 10:20 AM, "Jay Kreps" wrote: > Hey guys, > > Jun, Neha, and I have signed up for some areas. It would be great if other > committers could give an area of interest where they would be willing to > serve as point: > https://cwiki.apache.org/confluence/display/KAFKA/Maintainers > > -Jay >
[jira] [Updated] (KAFKA-642) Protocol tweaks for 0.8
[ https://issues.apache.org/jira/browse/KAFKA-642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-642: Attachment: KAFKA-642-v2.patch I should clarify that my goal here is to make the most minimal change that fixes the client protocol for at least the user-facing apis. I also wussed out on trying to generalize offsetrequest/response. Basically I think trying those things at these point would just take and we are trying to stabalize and release. So I made some of the changes you recommend, but some I think are bigger, and my hope was to hold off on those. For example my goal is not to implement correlation id, just add it to the protocol. To properly handle correlation id we need to make it so that we have a single counter across all requests on a single connection which is hard to do right now. I have some thoughts on generalizing some of our serialization and request handling stuff which I started to discuss in KAFKA-643. All I want to do now is fix as much of the protocol as I can while breaking as little as possible in the process. 1. Agreed, fixed. 2. Ack, I missed the correlation id in OffsetResponse. I had intended to leave it out of the non-public apis since this was meant to be a minimal change, but it is easy to add so i will do so. This should simplify future upgrades. 3.1 Yeah, but see above comment. 3.2 I mean properly speaking having a default correlation id doesn't really make sense does it? Anything other than a per-connection counter is basically a bug... 4. No, it is a signed int so it should be fine for it to roll over every 4 billion requests per connection, that will take a while. 5. Good point. 6. Done 7. See above comment on correlationId 8. Did it for DefaultEventHandler as that is easy, cowardly not attempting for consumer. 9.1 Done. 9.2 Deleted, should not duplicate protocol docs. 9.3 I chickened out on this. We will have to do it as a follow-up post 0.8 item. 10.1 Agreed, but this is several weeks of work I think. This is a pretty big refactoring. Some thoughts on KAFKA-643. 11. Yeah, I mean basically that constructor shouldn't exist at all since it isn't setting client id either. > Protocol tweaks for 0.8 > --- > > Key: KAFKA-642 > URL: https://issues.apache.org/jira/browse/KAFKA-642 > Project: Kafka > Issue Type: Bug >Reporter: Jay Kreps >Priority: Blocker > Attachments: KAFKA-642-v1.patch, KAFKA-642-v2.patch > > > There are a couple of things in the protocol that are not idea. It would be > good to tweak these for 0.8 so we start clean. > Here is a set of problems and proposals: > Problems: > 1. Correlation id is not used across all the requests. I don't think it can > work as intended because of this. > 2. On reflection I am not sure that we need a correlation id field. I think > that since we need to guarantee that processing is sequential on any > particular socket we can correlate with a simple queue. (e.g. as the client > sends messages it adds them to a queue and as it receives responses it just > correlates to whatever is at the head of the queue). > 3. The metadata response seems to have a number of problems. Among them is > that it weirdly repeats all the broker information many times. The response > includes the ISR, leader (maybe), and the replicas. Each of these repeat all > the broker information. This is super weird. I think what we should be doing > here is including all broker information for all brokers and then just having > the appropriate ids for the isr, leader, and replicas. > 4. For topic discovery I think we need to support the case where no topics > are specified in the metadata request and for this return information about > all topics. I don't think we do this now. > 5. I don't understand what the creator id is. > 6. The offset request and response is not fully thought through and should be > generalized. > Proposals: > 1, 2. Correlation id. This is not strictly speaking needed, but it is maybe > useful for debugging to be able to trace a particular request from client to > server. So we will extend this across all the requests. > 3. For metadata response I will try to fix this up by normalizing out the > broker list and having the isr, replicas, and leader field just have the node > id. > 4. This should be uncontroversial and easy to add. > 5. Let's remove creator id, it isn't used. > 6. Let's generalize offset request. My proposal is below: > Rename TopicMetadata API to ClusterMetadata, as this will contain all the > data that is known cluster-wide. Then let's generalize the offset request to > be PartitionMetadata--namely stuff about a particular partition on a > particular server. > The format of PartitionMetdata would be the following: > PartitionMetadataRequest => [TopicName [Partiti
[jira] [Commented] (KAFKA-642) Protocol tweaks for 0.8
[ https://issues.apache.org/jira/browse/KAFKA-642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13508985#comment-13508985 ] Jay Kreps commented on KAFKA-642: - Updated patch addresses issues as per points above. Still not rebased...await love from apache infra folks to revive our git. > Protocol tweaks for 0.8 > --- > > Key: KAFKA-642 > URL: https://issues.apache.org/jira/browse/KAFKA-642 > Project: Kafka > Issue Type: Bug >Reporter: Jay Kreps >Priority: Blocker > Attachments: KAFKA-642-v1.patch, KAFKA-642-v2.patch > > > There are a couple of things in the protocol that are not idea. It would be > good to tweak these for 0.8 so we start clean. > Here is a set of problems and proposals: > Problems: > 1. Correlation id is not used across all the requests. I don't think it can > work as intended because of this. > 2. On reflection I am not sure that we need a correlation id field. I think > that since we need to guarantee that processing is sequential on any > particular socket we can correlate with a simple queue. (e.g. as the client > sends messages it adds them to a queue and as it receives responses it just > correlates to whatever is at the head of the queue). > 3. The metadata response seems to have a number of problems. Among them is > that it weirdly repeats all the broker information many times. The response > includes the ISR, leader (maybe), and the replicas. Each of these repeat all > the broker information. This is super weird. I think what we should be doing > here is including all broker information for all brokers and then just having > the appropriate ids for the isr, leader, and replicas. > 4. For topic discovery I think we need to support the case where no topics > are specified in the metadata request and for this return information about > all topics. I don't think we do this now. > 5. I don't understand what the creator id is. > 6. The offset request and response is not fully thought through and should be > generalized. > Proposals: > 1, 2. Correlation id. This is not strictly speaking needed, but it is maybe > useful for debugging to be able to trace a particular request from client to > server. So we will extend this across all the requests. > 3. For metadata response I will try to fix this up by normalizing out the > broker list and having the isr, replicas, and leader field just have the node > id. > 4. This should be uncontroversial and easy to add. > 5. Let's remove creator id, it isn't used. > 6. Let's generalize offset request. My proposal is below: > Rename TopicMetadata API to ClusterMetadata, as this will contain all the > data that is known cluster-wide. Then let's generalize the offset request to > be PartitionMetadata--namely stuff about a particular partition on a > particular server. > The format of PartitionMetdata would be the following: > PartitionMetadataRequest => [TopicName [PartitionId MinSegmentTime > MaxSegmentInfos]] > TopicName => string > PartitionId => uint32 > MinSegmentTime => uint64 > MaxSegmentInfos => int32 > PartitionMetadataResponse => [TopicName [PartitionMetadata]] > TopicName => string > PartitionMetadata => PartitionId LogSize NumberOfSegments LogEndOffset > HighwaterMark [SegmentData] > SegmentData => StartOffset LastModifiedTime > LogSize => uint64 > NumberOfSegments => int32 > LogEndOffset => int64 > HighwaterMark => int64 > This would be general enough that we could continue to add to it for any new > pieces of data we need. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-642) Protocol tweaks for 0.8
[ https://issues.apache.org/jira/browse/KAFKA-642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-642: Attachment: KAFKA-642-v3.patch Missed two files on that last patch... > Protocol tweaks for 0.8 > --- > > Key: KAFKA-642 > URL: https://issues.apache.org/jira/browse/KAFKA-642 > Project: Kafka > Issue Type: Bug >Reporter: Jay Kreps >Priority: Blocker > Attachments: KAFKA-642-v1.patch, KAFKA-642-v2.patch, > KAFKA-642-v3.patch > > > There are a couple of things in the protocol that are not idea. It would be > good to tweak these for 0.8 so we start clean. > Here is a set of problems and proposals: > Problems: > 1. Correlation id is not used across all the requests. I don't think it can > work as intended because of this. > 2. On reflection I am not sure that we need a correlation id field. I think > that since we need to guarantee that processing is sequential on any > particular socket we can correlate with a simple queue. (e.g. as the client > sends messages it adds them to a queue and as it receives responses it just > correlates to whatever is at the head of the queue). > 3. The metadata response seems to have a number of problems. Among them is > that it weirdly repeats all the broker information many times. The response > includes the ISR, leader (maybe), and the replicas. Each of these repeat all > the broker information. This is super weird. I think what we should be doing > here is including all broker information for all brokers and then just having > the appropriate ids for the isr, leader, and replicas. > 4. For topic discovery I think we need to support the case where no topics > are specified in the metadata request and for this return information about > all topics. I don't think we do this now. > 5. I don't understand what the creator id is. > 6. The offset request and response is not fully thought through and should be > generalized. > Proposals: > 1, 2. Correlation id. This is not strictly speaking needed, but it is maybe > useful for debugging to be able to trace a particular request from client to > server. So we will extend this across all the requests. > 3. For metadata response I will try to fix this up by normalizing out the > broker list and having the isr, replicas, and leader field just have the node > id. > 4. This should be uncontroversial and easy to add. > 5. Let's remove creator id, it isn't used. > 6. Let's generalize offset request. My proposal is below: > Rename TopicMetadata API to ClusterMetadata, as this will contain all the > data that is known cluster-wide. Then let's generalize the offset request to > be PartitionMetadata--namely stuff about a particular partition on a > particular server. > The format of PartitionMetdata would be the following: > PartitionMetadataRequest => [TopicName [PartitionId MinSegmentTime > MaxSegmentInfos]] > TopicName => string > PartitionId => uint32 > MinSegmentTime => uint64 > MaxSegmentInfos => int32 > PartitionMetadataResponse => [TopicName [PartitionMetadata]] > TopicName => string > PartitionMetadata => PartitionId LogSize NumberOfSegments LogEndOffset > HighwaterMark [SegmentData] > SegmentData => StartOffset LastModifiedTime > LogSize => uint64 > NumberOfSegments => int32 > LogEndOffset => int64 > HighwaterMark => int64 > This would be general enough that we could continue to add to it for any new > pieces of data we need. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-598) decouple fetch size from max message size
[ https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13509060#comment-13509060 ] Jun Rao commented on KAFKA-598: --- That's an interesting alternative. The main drawback that I see is the first problem that you raised. Any consumer that subscribes to a wildcard (e.g., mirror maker) could be consuming a growing # of topics over time. This means that one has to know the number of topic/partitions in order to set max.fetch.size properly and one has to keep tweaking it as the number of topic/partitions changes. > decouple fetch size from max message size > - > > Key: KAFKA-598 > URL: https://issues.apache.org/jira/browse/KAFKA-598 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Joel Koshy > Attachments: KAFKA-598-v1.patch > > > Currently, a consumer has to set fetch size larger than the max message size. > This increases the memory footprint on the consumer, especially when a large > number of topic/partition is subscribed. By decoupling the fetch size from > max message size, we can use a smaller fetch size for normal consumption and > when hitting a large message (hopefully rare), we automatically increase > fetch size to max message size temporarily. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-598) decouple fetch size from max message size
[ https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13509064#comment-13509064 ] Joel Koshy commented on KAFKA-598: -- Thanks for bringing this up. I had considered a slight variant of your suggestion (although I was thinking of issuing requests with upper fetch size for incomplete partitions one partition at a time, instead of another multi-fetch on all incomplete partitions as done in this patch). I didn't go with it mainly due to the concern (1) that you raise - i.e., to avoid doing sequential fetches, although I don't think that is too much of an issue. Whether having an upper fetch size config is better than the current set up depends on the use case: e.g., if there are only a few partitions that have large messages, then the upper fetch size approach would work well. In the worst case of all partitions being incomplete that would lead to a large allocation - which is also why I felt the "pipelined fetches of incomplete partitions" approach added no real value (since it is equivalent in terms of the net memory the client is expected to handle). Anyway I think the above then leads naturally to your suggestion of using a single fetch size and dividing that across all partitions. I like that approach - especially since there are no new configs to deal with. I would think the memory allocation concerns are valid but tolerable from the client's perspective - i.e., the heavy memory allocations only kick in when there are incomplete partitions in which case I think most clients would want to consume anyway (along with a log warning indicating a large message). One minor drawback is that there isn't really a clear default value for fetch size - right now, it is reasonable to say with a fetch size of 1MB that is also the (approximate) max size of a message. With the above re-design we can no longer map the config to messages since there is no prior knowledge of number of partitions consumed by each consumer thread, but I don't think that's a big deal. So as I see it the, choices are: 1) Change the semantics of fetch size to be net, across all partitions. If/when incomplete partitions are encountered, For each incomplete partition, issue a fetch request of size fetch.size. I think if we do this we should also include the total memory that would be used - i.e., including the queued chunks. 2) Introduce a new config called upper fetch size that kicks in whenever there are incomplete partitions - for which: a) issue a multi-fetch request with size upper fetch size for all incomplete partitions. OR b) issue sequential fetch requests of upper fetch size, one incomplete partition at a time. 3) If we had byte-addressability for fetches (which I really think we should allow at least for these kinds of internal APIs) we could consider a third option: keep fetch size as is, and issue pipelined fetch requests to build up and complete incomplete partition, one at a time. What do people think? > decouple fetch size from max message size > - > > Key: KAFKA-598 > URL: https://issues.apache.org/jira/browse/KAFKA-598 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Joel Koshy > Attachments: KAFKA-598-v1.patch > > > Currently, a consumer has to set fetch size larger than the max message size. > This increases the memory footprint on the consumer, especially when a large > number of topic/partition is subscribed. By decoupling the fetch size from > max message size, we can use a smaller fetch size for normal consumption and > when hitting a large message (hopefully rare), we automatically increase > fetch size to max message size temporarily. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-598) decouple fetch size from max message size
[ https://issues.apache.org/jira/browse/KAFKA-598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13509318#comment-13509318 ] Jay Kreps commented on KAFKA-598: - So I guess the one hard requirement is that we have to be able to tell people how much memory our client will use. People have to set a heap size, and if we can't tell them how much memory we will use without crashing their app they will be unhappy. Let's consider a hard case: 5 consumer processes, 100 topics, with 5 partitions each, and queue size of 5. With fetch size of 1MB and upper fetch size of 50MB what is a safe heap size for this person to configure and be assured we won't crash their app? This is why I don't really see pipelined fetches helping. They have to stich it together into a ByteBuffer in the end, so fetching that in pieces doesn't really help. Supporting non-memory resident messages is possible but would be a massive re-architecture of almost everything. Another option I don't think you covered would be to change the fetch request so that it takes a single size rather than one per partition. That would solve one dilemma we currently have--manu topics could have no new data but we have to budget space for them as if they would (since they might), doing this on the server side we could be a little bit smarter. However we would need to ensure that one partition that has infinite data to read can't starve out other partitions. > decouple fetch size from max message size > - > > Key: KAFKA-598 > URL: https://issues.apache.org/jira/browse/KAFKA-598 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Jun Rao >Assignee: Joel Koshy > Attachments: KAFKA-598-v1.patch > > > Currently, a consumer has to set fetch size larger than the max message size. > This increases the memory footprint on the consumer, especially when a large > number of topic/partition is subscribed. By decoupling the fetch size from > max message size, we can use a smaller fetch size for normal consumption and > when hitting a large message (hopefully rare), we automatically increase > fetch size to max message size temporarily. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-608) getTopicMetadata does not respect producer config settings
[ https://issues.apache.org/jira/browse/KAFKA-608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede closed KAFKA-608. --- > getTopicMetadata does not respect producer config settings > -- > > Key: KAFKA-608 > URL: https://issues.apache.org/jira/browse/KAFKA-608 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Jay Kreps >Assignee: Neha Narkhede >Priority: Blocker > Fix For: 0.8 > > Attachments: kafka-608-v1.patch, kafka-608-v2.patch > > > ProducerPool.scala contains the following code: > object ProducerPool{ > def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): > SyncProducer = { > val props = new Properties() > props.put("host", broker.host) > props.put("port", broker.port.toString) > if(configOpt.isDefined) > props.putAll(configOpt.get.props.props) > new SyncProducer(new SyncProducerConfig(props)) > } > } > Note also, that ClientUtils.getTopicMetadata() does the following: >ProducerPool.createSyncProducer(None, brokers(i)) > As a result there is no way to control the socket settings for the get > metadata request. > My recommendation is that we require the config to be specified in the > Note that this creates a new sync producer without using ANY of the settings > the user had given for the producer. In particular the socket timeout is > defaulted to 500ms. > This causes unit tests to fail a lot since a newly started test may easily > timeout on a 500ms request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-608) getTopicMetadata does not respect producer config settings
[ https://issues.apache.org/jira/browse/KAFKA-608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-608: Resolution: Fixed Status: Resolved (was: Patch Available) > getTopicMetadata does not respect producer config settings > -- > > Key: KAFKA-608 > URL: https://issues.apache.org/jira/browse/KAFKA-608 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8 >Reporter: Jay Kreps >Assignee: Neha Narkhede >Priority: Blocker > Fix For: 0.8 > > Attachments: kafka-608-v1.patch, kafka-608-v2.patch > > > ProducerPool.scala contains the following code: > object ProducerPool{ > def createSyncProducer(configOpt: Option[ProducerConfig], broker: Broker): > SyncProducer = { > val props = new Properties() > props.put("host", broker.host) > props.put("port", broker.port.toString) > if(configOpt.isDefined) > props.putAll(configOpt.get.props.props) > new SyncProducer(new SyncProducerConfig(props)) > } > } > Note also, that ClientUtils.getTopicMetadata() does the following: >ProducerPool.createSyncProducer(None, brokers(i)) > As a result there is no way to control the socket settings for the get > metadata request. > My recommendation is that we require the config to be specified in the > Note that this creates a new sync producer without using ANY of the settings > the user had given for the producer. In particular the socket timeout is > defaulted to 500ms. > This causes unit tests to fail a lot since a newly started test may easily > timeout on a 500ms request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-622) Create mbeans per client
[ https://issues.apache.org/jira/browse/KAFKA-622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13509335#comment-13509335 ] Jun Rao commented on KAFKA-622: --- Sorry for the late review. Have a few minor questions/comments. 60. What happens if have 2 instances of Consumers with the same clientid in the same jvm? Does one of them fail because it fails to register metrics? Ditto for Producers. 61. ConsumerTopicStats: What if a topic is named AllTopics? We use to handle this by adding a - in topic specific stats. 62. ZookeeperConsumerConnector: Do we need to validate groupid? 63. ClientId: Does the clientid length need to be different from topic length? 64. AbstractFetcherThread: When building a fetch request, do we need to pass in brokerInfo as part of the client id? BrokerInfo contains the source broker info and the fetch requests are always made to the source broker. > Create mbeans per client > - > > Key: KAFKA-622 > URL: https://issues.apache.org/jira/browse/KAFKA-622 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8 >Reporter: Swapnil Ghike >Assignee: Swapnil Ghike >Priority: Blocker > Labels: bugs, improvement > Fix For: 0.8 > > Attachments: kafka-622-v1.patch, kafka-622-v2.patch, > kafka-622-v3.patch, kafka-622-v4.patch, kafka-622-v5.patch, kafka-622-v6.patch > > > Currently we create one mbean of each type for a given mbean server, > regardless of the number of clients. We should create MBeans per client for > both producer and consumer. To do that we need to introduce clientId in mbean > names. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
How to unsubscribe from mailing list?
I've tried sending an email to: kafka-dev-unsubscr...@incubator.apache.org kafka-dev-unsubscr...@kafka.apache.org No luck. any ideas? thanks, /m
Re: How to unsubscribe from mailing list?
Try dev-unsubscr...@kafka.apache.org Thanks, Jun On Mon, Dec 3, 2012 at 8:03 PM, Marv Lush wrote: > I've tried sending an email to: > > > kafka-dev-unsubscr...@incubator.apache.org > > kafka-dev-unsubscr...@kafka.apache.org > > No luck. > > any ideas? > > thanks, > > /m >
[jira] [Updated] (KAFKA-597) Refactor KafkaScheduler
[ https://issues.apache.org/jira/browse/KAFKA-597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-597: Attachment: KAFKA-597-v1.patch This patch refactors the scheduler as described above except that I didn't implement task cancelation. It also converts LogManagerTest to use the new MockScheduler. > Refactor KafkaScheduler > --- > > Key: KAFKA-597 > URL: https://issues.apache.org/jira/browse/KAFKA-597 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1 >Reporter: Jay Kreps >Priority: Minor > Attachments: KAFKA-597-v1.patch > > > It would be nice to cleanup KafkaScheduler. Here is what I am thinking > Extract the following interface: > trait Scheduler { > def startup() > def schedule(fun: () => Unit, name: String, delayMs: Long = 0, periodMs: > Long): Scheduled > def shutdown(interrupt: Boolean = false) > } > class Scheduled { > def lastExecution: Long > def cancel() > } > We would have two implementations, KafkaScheduler and MockScheduler. > KafkaScheduler would be a wrapper for ScheduledThreadPoolExecutor. > MockScheduler would only allow manual time advancement rather than using the > system clock, we would switch unit tests over to this. > This change would be different from the existing scheduler in a the following > ways: > 1. Would not return a ScheduledFuture (since this is useless) > 2. shutdown() would be a blocking call. The current shutdown calls, don't > really do what people want. > 3. We would remove the daemon thread flag, as I don't think it works. > 4. It returns an object which let's you cancel the job or get the last > execution time. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira