Re: metadata request/response question
Since you send it a Topic that does not exist, it sends you a blank list of brokers. So extending it, I suppose, would mean that it would send only those brokers that contain data. Regards Milind On Wed, Jun 12, 2013 at 2:24 PM, Dave Peterson wrote: > Suppose I send a metadata request for a single topic or some small > subset of the entire set of available topics. Will the response contain > broker information for all brokers, or just brokers that contain data for > at least one of the requested topics? > > Thanks, > Dave >
offset fetch request - response issue and wire protocol kafka 0.8
Given the https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-OffsetFetchRequestas basis OffsetFetchRequest => ConsumerGroup [TopicName [Partition]] ConsumerGroup => string TopicName => string Partition => int32 and OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]] TopicName => string Partition => int32 Offset => int64 Metadata => string ErrorCode => int16 CorrelationId : 350001, ClientId: ERLKAFKA, APIKEY -> 7, APIVERSION 0, ConsumerGroup: CG, TopicName: Topic1, Partition: 0 The following bytes are produed in request RQLEN |APK|APV-| CorrelationI|CLE| ClientID | | |---||---||---| "CG" | Topic1|| 0 | --||---|-|---|---|---|-|---||---|| 0,0,0,42,0,7,0,0,208,157,195,1,0,8,69,82,76,75,65,70,75,65,0,2,67,71,0,0,0,1, 0,6,84,111,112,105,99,49,0,0,0,1,0,0,0,0 I get a response of RELEN|CorrelationId| |--|-|---| 0,0,0,10,208,157,195,1,255,255,0,0,0,0 which seems to be wrong according to the wire protocol. Also on Kafka Console, I get a [2013-06-17 01:25:36,440] ERROR [KafkaApi-1] error when handling request Name: ControlledShutdownRequest; Version: 0; CorrelationId: -794967295; BrokerId: 542034 (kafka.server.KafkaApis) kafka.common.BrokerNotAvailableException: Broker id 542034 does not exist. at kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:139) at kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:134) at kafka.server.KafkaApis.handle(KafkaApis.scala:73) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42) at java.lang.Thread.run(Thread.java:722) What am I doing wrong? Thanks Milind
Re: Offset commit api
Perhaps I don't understand the motivation well enough and perhaps I am misreading the intent. But I thought that the design principle behind kafka is for state (from a consumer standpoint) was to be managed by consumer and not broker. I understand that "These APIs are optional, clients can store offsets another way if they like." So three questions : (a) Is this a change from the original intent of Kafka? (b) If it is a change, why not make it such that there is no need for clients to roll their own? autocommit=false-> no storage of offsets autocommit=true -> store offsets (c) I suppose that the use case of multiple sets of consumer groups wanting to use the offsets for different purposes could be one of the use cases for the clients to roll their own. That corner case could be handled through handing out a uuid for a set of consumer group to operate against.Any other use cases for the clients to absolutely roll their own? Regards Milind On Mon, Dec 17, 2012 at 10:45 AM, Jay Kreps wrote: > Hey Guys, > > David has made a bunch of progress on the offset commit api implementation. > > Since this is a public API it would be good to do as much thinking up-front > as possible to minimize future iterations. > > It would be great if folks could do the following: > 1. Read the wiki here: > https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management > 2. Check out the code David wrote here: > https://issues.apache.org/jira/browse/KAFKA-657 > > In particular our hope is that this API can act as the first step in > scaling the way we store offsets (ZK is not really very appropriate for > this). This of course requires having some plan in mind for offset storage. > I have written (and then after getting some initial feedback, rewritten) a > section in the above wiki on how this might work. > > If no one says anything I will be taking a slightly modified patch that > adds this functionality on trunk as soon as David gets in a few minor > tweaks. > > -Jay >
Re: Offset commit api
+1 on limiting the size. But could you do 2k instead of 1k? Using Interval Time Clocks gets you a lot on distributed autonomous processing; but most large scale ITCs go upto 1.5K. http://code.google.com/p/itclocks/refer to the link on conference paper. Regards Milind On Thu, Dec 20, 2012 at 2:04 PM, Jay Kreps wrote: > Err, to clarify, I meant punt on persisting the metadata not punt on > persisting the offset. Basically that field would be in the protocol but > would be unused in this phase. > > -Jay > > > On Thu, Dec 20, 2012 at 2:03 PM, Jay Kreps wrote: > > > I actually recommend we just punt on implementing persistence in zk > > entirely, otherwise we have to have an upgrade path to grandfather over > > existing zk data to the new format. Let's just add it in the API and only > > actually store it out when we redo the backend. We can handle the size > > limit then too. > > > > -Jay > > > > > > On Thu, Dec 20, 2012 at 1:30 PM, David Arthur wrote: > > > >> No particular objection, though in order to support atomic writes of > >> (offset, metadata), we will need to define a protocol for the ZooKeeper > >> payloads. Something like: > >> > >> OffsetPayload => Offset [Metadata] > >> Metadata => length prefixed string > >> > >> should suffice. Otherwise we would have to rely on the multi-write > >> mechanism to keep parallel znodes in sync (I generally don't like things > >> like this). > >> > >> +1 for limiting the size (1kb sounds reasonable) > >> > >> > >> On 12/20/12 4:03 PM, Jay Kreps wrote: > >> > >>> Okay I did some assessment of use cases we have which aren't using the > >>> default offset storage API and came up with one generalization. I would > >>> like to propose--add a generic metadata field to the offset api on a > >>> per-partition basis. So that would leave us with the following: > >>> > >>> OffsetCommitRequest => ConsumerGroup [TopicName [Partition Offset > >>> Metadata]] > >>> > >>> OffsetFetchResponse => [TopicName [Partition Offset Metadata > ErrorCode]] > >>> > >>>Metadata => string > >>> > >>> If you want to store a reference to any associated state (say an HDFS > >>> file > >>> name) so that if the consumption fails over the new consumer can start > up > >>> with the same state, this would be a place to store that. It would not > be > >>> intended to support large stuff (we could enforce a 1k limit or > >>> something, > >>> just something small or a reference on where to find the state (say a > >>> file > >>> name). > >>> > >>> Objections? > >>> > >>> -Jay > >>> > >>> > >>> On Mon, Dec 17, 2012 at 10:45 AM, Jay Kreps > wrote: > >>> > >>> Hey Guys, > > David has made a bunch of progress on the offset commit api > implementation. > > Since this is a public API it would be good to do as much thinking > up-front as possible to minimize future iterations. > > It would be great if folks could do the following: > 1. Read the wiki here: > > https://cwiki.apache.org/**confluence/display/KAFKA/**Offset+Management< > https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management> > 2. Check out the code David wrote here: > https://issues.apache.org/**jira/browse/KAFKA-657< > https://issues.apache.org/jira/browse/KAFKA-657> > > In particular our hope is that this API can act as the first step in > scaling the way we store offsets (ZK is not really very appropriate > for > this). This of course requires having some plan in mind for offset > storage. > I have written (and then after getting some initial feedback, > rewritten) a > section in the above wiki on how this might work. > > If no one says anything I will be taking a slightly modified patch > that > adds this functionality on trunk as soon as David gets in a few minor > tweaks. > > -Jay > > > >> > > >