Yes, I think we are saying the same thing. Basically I am saying version 0 should be considered frozen as of the format and behavior in the 0.8.1 release and we can do whatever we want as a version 1+.
-Jay On Wed, Jan 7, 2015 at 10:10 AM, Joe Stein <joe.st...@stealth.ly> wrote: > << We need to take the versioning of the protocol seriously > > amen > > << People are definitely using the offset commit functionality in 0.8.1 > > agreed > > << I really think we should treat this as a bug and revert the change to > version 0. > > What do you mean exactly by revert? Why can't we use version as a feature > flag and support 0 and 1 at the same time? in the handleOffsetFetch and > handleOffsetCommit functions that process the request messages just do if > version == 0 old functionality else if version == 1 new functionality. > This way everyone works and nothing breaks =8^) > > /******************************************* > Joe Stein > Founder, Principal Consultant > Big Data Open Source Security LLC > http://www.stealth.ly > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > ********************************************/ > > On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps <j...@confluent.io> wrote: > > > Hey guys, > > > > We need to take the versioning of the protocol seriously. People are > > definitely using the offset commit functionality in 0.8.1 and I really > > think we should treat this as a bug and revert the change to version 0. > > > > -Jay > > > > On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao <j...@confluent.io> wrote: > > > > > Yes, we did make an incompatible change in OffsetCommitRequest in > 0.8.2, > > > which is a mistake. The incompatible change was introduced in > KAFKA-1012 > > in > > > Mar, 2014 when we added the kafka-based offset management support. > > However, > > > we didn't realize that this breaks the wire protocol until much later. > > Now, > > > the wire protocol has evolved again and it's a bit hard to fix the > format > > > in version 0. I can see a couple of options. > > > > > > Option 1: Just accept the incompatible change as it is. > > > The argument is that even though we introduced OffsetCommitRequest in > > > 0.8.1, it's not used in the high level consumer. It's possible that > some > > > users of SimpleConsumer started using it. However, that number is > likely > > > small. Also, the functionality of OffsetCommitRequest has changed since > > > it's writing the offset to a Kafka log, instead of ZK (for good > reasons). > > > So, we can document this as a wire protocol and functionality > > incompatible > > > change. For users who don't mind the functionality change, they will > need > > > to upgrade the client to the new protocol before they can use the new > > > broker. For users who want to preserve the old functionality, they will > > > have to write the offsets directly to ZK. In either case, hopefully the > > > number of people being affected is small. > > > > > > Option 2: Revert version 0 format to what's in 0.8.1. > > > There will be a few issues here. First, it's not clear how this affects > > > other people who have been deploying from trunk. Second, I am not sure > > that > > > we want to continue supporting writing the offset to ZK in > > > OffsetCommitRequest > > > since that can cause ZK to be overloaded. > > > > > > Joel Koshy, > > > > > > Any thoughts on this? > > > > > > Thanks, > > > > > > Jun > > > > > > On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein <joe.st...@stealth.ly> > wrote: > > > > > > > In addition to the issue you bring up, the functionality as a whole > has > > > > changed.. when you call OffsetFetchRequest the version = 0 needs to > > > > preserve the old functionality > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700 > > > > and version = 1 the new > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223 > > > > . > > > > Also the OffsetFetchRequest functionality even though the wire > protocol > > > is > > > > the same after the 0.8.2 upgrade for OffsetFetchRequest if you were > > using > > > > 0.8.1.1 OffsetFetchRequest > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728 > > > > will stop going to zookeeper and start going to Kafka storage > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519 > > > > so more errors will happen and things break too. > > > > > > > > I think we should treat the version field not just to stop from > > breaking > > > > the wire protocol calls but also as a "feature flag" preserving > > upgrades > > > > and multiple pathways. > > > > > > > > I updated the JIRA for the feature flag needs for OffsetFetch and > > > > OffsetCommit too. > > > > > > > > /******************************************* > > > > Joe Stein > > > > Founder, Principal Consultant > > > > Big Data Open Source Security LLC > > > > http://www.stealth.ly > > > > Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> > > > > ********************************************/ > > > > > > > > On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers <dana.pow...@rd.io> > wrote: > > > > > > > > > ok, opened KAFKA-1841 . KAFKA-1634 also related. > > > > > > > > > > -Dana > > > > > > > > > > On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira < > gshap...@cloudera.com > > > > > > > > wrote: > > > > > > > > > > > Ooh, I see what you mean - the OffsetAndMetadata (or > PartitionData) > > > > > > part of the Map changed, which will modify the wire protocol. > > > > > > > > > > > > This is actually not handled in the Java client either. It will > > send > > > > > > the timestamp no matter which version is used. > > > > > > > > > > > > This looks like a bug and I'd even mark it as blocker for 0.8.2 > > since > > > > > > it may prevent rolling upgrades. > > > > > > > > > > > > Are you opening the JIRA? > > > > > > > > > > > > Gwen > > > > > > > > > > > > On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers <dana.pow...@rd.io> > > > > wrote: > > > > > > > specifically comparing 0.8.1 -- > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50 > > > > > > > ``` > > > > > > > (1 to partitionCount).map(_ => { > > > > > > > val partitionId = buffer.getInt > > > > > > > val offset = buffer.getLong > > > > > > > val metadata = readShortString(buffer) > > > > > > > (TopicAndPartition(topic, partitionId), > > > > > OffsetMetadataAndError(offset, > > > > > > > metadata)) > > > > > > > }) > > > > > > > ``` > > > > > > > > > > > > > > totrunk -- > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69 > > > > > > > ``` > > > > > > > (1 to partitionCount).map(_ => { > > > > > > > val partitionId = buffer.getInt > > > > > > > val offset = buffer.getLong > > > > > > > val timestamp = { > > > > > > > val given = buffer.getLong > > > > > > > if (given == -1L) now else given > > > > > > > } > > > > > > > val metadata = readShortString(buffer) > > > > > > > (TopicAndPartition(topic, partitionId), > > OffsetAndMetadata(offset, > > > > > > > metadata, timestamp)) > > > > > > > }) > > > > > > > ``` > > > > > > > > > > > > > > should the `timestamp` buffer read be wrapped in an api version > > > > check? > > > > > > > > > > > > > > > > > > > > > Dana Powers > > > > > > > Rdio, Inc. > > > > > > > dana.pow...@rd.io > > > > > > > rdio.com/people/dpkp/ > > > > > > > > > > > > > > On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira < > > > gshap...@cloudera.com> > > > > > > wrote: > > > > > > > > > > > > > >> Ah, I see :) > > > > > > >> > > > > > > >> The readFrom function basically tries to read two extra fields > > if > > > > you > > > > > > >> are on version 1: > > > > > > >> > > > > > > >> if (versionId == 1) { > > > > > > >> groupGenerationId = buffer.getInt > > > > > > >> consumerId = readShortString(buffer) > > > > > > >> } > > > > > > >> > > > > > > >> The rest looks identical in version 0 and 1, and still no > > > timestamp > > > > in > > > > > > >> sight... > > > > > > >> > > > > > > >> Gwen > > > > > > >> > > > > > > >> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers < > dana.pow...@rd.io> > > > > > wrote: > > > > > > >> > Hi Gwen, I am using/writing kafka-python to construct api > > > requests > > > > > and > > > > > > >> have > > > > > > >> > not dug too deeply into the server source code. But I > believe > > > it > > > > is > > > > > > >> > kafka/api/OffsetCommitRequest.scala and specifically the > > > readFrom > > > > > > method > > > > > > >> > used to decode the wire protocol. > > > > > > >> > > > > > > > >> > -Dana > > > > > > >> > OffsetCommitRequest has two constructors now: > > > > > > >> > > > > > > > >> > For version 0: > > > > > > >> > OffsetCommitRequest(String groupId, Map<TopicPartition, > > > > > > >> > PartitionData> offsetData) > > > > > > >> > > > > > > > >> > And version 1: > > > > > > >> > OffsetCommitRequest(String groupId, int generationId, String > > > > > > >> > consumerId, Map<TopicPartition, PartitionData> offsetData) > > > > > > >> > > > > > > > >> > None of them seem to require timestamps... so I'm not sure > > where > > > > you > > > > > > >> > see that this is required. Can you share an example? > > > > > > >> > > > > > > > >> > Gwen > > > > > > >> > > > > > > > >> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers < > > dana.pow...@rd.io > > > > > > > > > > wrote: > > > > > > >> >> Hi Joel, > > > > > > >> >> > > > > > > >> >> I'm looking more closely at the OffsetCommitRequest wire > > > protocol > > > > > > change > > > > > > >> >> you mentioned below, and I cannot figure out how to > > explicitly > > > > > > >> construct a > > > > > > >> >> request with the earlier version. Should the api version > be > > > > > > different > > > > > > >> for > > > > > > >> >> requests that do not include it and/or servers that do not > > > > support > > > > > > the > > > > > > >> >> timestamp field? It looks like 0.8.1.1 did not include the > > > > > timestamp > > > > > > >> > field > > > > > > >> >> and used api version 0. But 0.8.2-beta seems to now > require > > > > > > timestamps > > > > > > >> >> even when I explicitly encode OffsetCommitRequest api > > version 0 > > > > > > (server > > > > > > >> >> logs a BufferUnderflowException). > > > > > > >> >> > > > > > > >> >> Is this the expected server behavior? Can you provide any > > tips > > > > on > > > > > > how > > > > > > >> >> third-party clients should manage the wire-protocol change > > for > > > > this > > > > > > api > > > > > > >> >> method (I'm working on kafka-python)? > > > > > > >> >> > > > > > > >> >> Thanks, > > > > > > >> >> > > > > > > >> >> -Dana > > > > > > >> >> > > > > > > >> >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy < > > > jjkosh...@gmail.com > > > > > > > > > > > >> wrote: > > > > > > >> >> > > > > > > >> >>> Yes it should be backwards compatible. So for e.g., you > > should > > > > be > > > > > > able > > > > > > >> >>> to use an 0.8.1 client with an 0.8.2 broker. In general, > you > > > > > should > > > > > > >> >>> not upgrade your clients until after the brokers have been > > > > > upgraded. > > > > > > >> >>> However, you can point an 0.8.2 client at an 0.8.1 broker. > > One > > > > > wire > > > > > > >> >>> protocol change I'm aware of is the OffsetCommitRequest. > > > There > > > > > is a > > > > > > >> >>> change in the OffsetCommitRequest format (KAFKA-1634) > > although > > > > you > > > > > > can > > > > > > >> >>> explicitly construct an OffsetCommitRequest with the > earlier > > > > > > version. > > > > > > >> >>> > > > > > > >> >>> Thanks, > > > > > > >> >>> > > > > > > >> >>> Joel > > > > > > >> >>> > > > > > > >> >>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici > > > > wrote: > > > > > > >> >>> > Hi Joel, > > > > > > >> >>> > > > > > > > >> >>> > Thanks for all the clarifications! Just another > question > > on > > > > > this: > > > > > > >> will > > > > > > >> >>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 > > was > > > > with > > > > > > 0.8? > > > > > > >> >>> > Generally speaking, would there be any concerns with > using > > > the > > > > > > 0.8.2 > > > > > > >> >>> > consumer with a 0.8.1 broker, for instance? > > > > > > >> >>> > > > > > > > >> >>> > Marius > > > > > > >> >>> > > > > > > > >> >>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy < > > > > > jjkosh...@gmail.com> > > > > > > >> > wrote: > > > > > > >> >>> > > > > > > > >> >>> > > Inline.. > > > > > > >> >>> > > > > > > > > >> >>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius > > Bogoevici > > > > > > wrote: > > > > > > >> >>> > > > Hello everyone, > > > > > > >> >>> > > > > > > > > > >> >>> > > > I have a few questions about the current status and > > > future > > > > > of > > > > > > the > > > > > > >> >>> Kafka > > > > > > >> >>> > > > consumers. > > > > > > >> >>> > > > > > > > > > >> >>> > > > We have been working to adding Kafka support in > Spring > > > XD > > > > > [1], > > > > > > >> >>> currently > > > > > > >> >>> > > > using the high level consumer via Spring Integration > > > Kafka > > > > > > [2]. > > > > > > >> We > > > > > > >> >>> are > > > > > > >> >>> > > > working on adding features such as: > > > > > > >> >>> > > > - the ability to control offsets/replay topics; > > > > > > >> >>> > > > - the ability to control partition allocation across > > > > > multiple > > > > > > >> >>> consumers; > > > > > > >> >>> > > > > > > > > > >> >>> > > > We are currently at version 0.8.1.1, so using the > > simple > > > > > > consumer > > > > > > >> > is > > > > > > >> >>> a > > > > > > >> >>> > > > pretty straightforward choice right now. However, in > > the > > > > > > light of > > > > > > >> > the > > > > > > >> >>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have > a > > > few > > > > > > >> > questions: > > > > > > >> >>> > > > > > > > > > >> >>> > > > 1) With respect to the consumer redesign for 0.9, > what > > > is > > > > > the > > > > > > >> > future > > > > > > >> >>> of > > > > > > >> >>> > > the > > > > > > >> >>> > > > Simple Consumer and High Level Consumer? To my best > > > > > > >> understanding, > > > > > > >> >>> the > > > > > > >> >>> > > > existing high level consumer API will be deprecated > in > > > > > favour > > > > > > of > > > > > > >> > the > > > > > > >> >>> new > > > > > > >> >>> > > > consumer API. What is the future of the Simple > > Consumer, > > > > in > > > > > > this > > > > > > >> >>> case? it > > > > > > >> >>> > > > will continue to exist as a low-level API > implementing > > > the > > > > > > Kafka > > > > > > >> >>> protocol > > > > > > >> >>> > > > [3] and providing the building blocks for the new > > > > consumer, > > > > > or > > > > > > >> will > > > > > > >> >>> it be > > > > > > >> >>> > > > deprecated as well? > > > > > > >> >>> > > > > > > > > >> >>> > > The new consumer will subsume both use-cases (simple > and > > > > > > >> high-level). > > > > > > >> >>> > > You can still use the old SimpleConsumer if you wish - > > > i.e., > > > > > the > > > > > > >> wire > > > > > > >> >>> > > protocol for fetch and other requests will still be > > > > supported. > > > > > > >> >>> > > > > > > > > >> >>> > > > > > > > > > >> >>> > > > 2) Regarding the new consumer: the v0.8.2 codebase > > > > contains > > > > > an > > > > > > >> > early > > > > > > >> >>> > > > implementation of it, but since this a feature > > scheduled > > > > > only > > > > > > for > > > > > > >> >>> 0.9, > > > > > > >> >>> > > what > > > > > > >> >>> > > > is its status as well? Is it included only as a > future > > > > > > reference > > > > > > >> > and > > > > > > >> >>> for > > > > > > >> >>> > > > stabilizing the API? > > > > > > >> >>> > > > > > > > > >> >>> > > It is a WIP so you cannot really use it. > > > > > > >> >>> > > > > > > > > >> >>> > > > 3) Obviously, offset management is a concern if > using > > > the > > > > > > simple > > > > > > >> >>> > > consumer, > > > > > > >> >>> > > > so - wondering about the Offset Management API as > > well. > > > > The > > > > > > Kafka > > > > > > >> >>> > > protocol > > > > > > >> >>> > > > document specifically indicates that it will be > fully > > > > > > functional > > > > > > >> in > > > > > > >> >>> 0.8.2 > > > > > > >> >>> > > > [4] - however, a functional implementation is > already > > > > > > available > > > > > > >> in > > > > > > >> >>> > > 0.8.1.1 > > > > > > >> >>> > > > (accessible via the SimpleConsumer API but not > > > documented > > > > in > > > > > > >> [5]). > > > > > > >> >>> Again, > > > > > > >> >>> > > > trying to understand the extent of what 0.8.1.1 > > already > > > > > > supports > > > > > > >> >>> > > > (ostensibly, the offset manager support seems to > have > > > been > > > > > > added > > > > > > >> >>> only in > > > > > > >> >>> > > > 0.8.2 - please correct me if I am wrong), and > whether > > if > > > > it > > > > > is > > > > > > >> >>> > > recommended > > > > > > >> >>> > > > for use in production in any form (with the caveats > > that > > > > > > >> accompany > > > > > > >> >>> the > > > > > > >> >>> > > use > > > > > > >> >>> > > > of ZooKeeper). > > > > > > >> >>> > > > > > > > > >> >>> > > In 0.8.2 the OffsetCommitRequest and > OffsetFetchRequest > > > will > > > > > use > > > > > > >> > Kafka > > > > > > >> >>> > > as the offsets storage mechanism (not zookeeper). > > > High-level > > > > > > Java > > > > > > >> >>> > > consumers can choose to store offsets in ZooKeeper > > instead > > > > by > > > > > > >> setting > > > > > > >> >>> > > offsets.storage=zookeeper > > > > > > >> >>> > > > > > > > > >> >>> > > However, if you are using the simple consumer and wish > > to > > > > > store > > > > > > >> >>> > > offsets in ZooKeeper you will need to commit to > > ZooKeeper > > > > > > directly. > > > > > > >> >>> > > You can use ZkUtils in the kafka.utils package for > this. > > > > > > >> >>> > > > > > > > > >> >>> > > If you wish to move to Kafka-based offsets we will be > > > > adding a > > > > > > new > > > > > > >> >>> > > OffsetsClient that can be used to commit/fetch offsets > > > > to/from > > > > > > >> Kafka. > > > > > > >> >>> > > This is currently not listed as a blocker for 0.8.2 > but > > I > > > > > think > > > > > > we > > > > > > >> >>> > > should include it. I will update that ticket. > > > > > > >> >>> > > > > > > > > >> >>> > > > 4) Trying to interpret the existing examples in [6] > > and > > > > the > > > > > > >> > comments > > > > > > >> >>> on > > > > > > >> >>> > > [7] > > > > > > >> >>> > > > - the version of the Offset Management API that > exists > > > in > > > > > > 0.8.1.1 > > > > > > >> > is > > > > > > >> >>> > > using > > > > > > >> >>> > > > ZooKeeper - whereas ZooKeeper will be optional in > > 0.8.2 > > > - > > > > to > > > > > > be > > > > > > >> >>> replaced > > > > > > >> >>> > > by > > > > > > >> >>> > > > Kafka, and phased out if possible. To my > > understanding, > > > > the > > > > > > >> switch > > > > > > >> >>> > > between > > > > > > >> >>> > > > the two will be controlled by the broker > configuration > > > > > (along > > > > > > >> with > > > > > > >> >>> other > > > > > > >> >>> > > > parameters that control the performance of offset > > > queues. > > > > Is > > > > > > that > > > > > > >> >>> > > correct? > > > > > > >> >>> > > > > > > > > >> >>> > > The switch is a client-side configuration. That wiki > is > > > not > > > > > > >> >>> > > up-to-date. The most current documentation is > available > > > as a > > > > > > patch > > > > > > >> in > > > > > > >> >>> > > https://issues.apache.org/jira/browse/KAFKA-1729 > > > > > > >> >>> > > > > > > > > >> >>> > > > 5) Also, wondering about the timeline of 0.8.2 - > > > according > > > > > to > > > > > > the > > > > > > >> >>> > > roadmaps > > > > > > >> >>> > > > it should be released relatively shortly. Is that > > > correct? > > > > > > >> >>> > > > > > > > > >> >>> > > Yes - once the blockers are ironed out. > > > > > > >> >>> > > > > > > > > >> >>> > > > > > > > > > >> >>> > > > Thanks, > > > > > > >> >>> > > > Marius > > > > > > >> >>> > > > > > > > > > >> >>> > > > [1] http://projects.spring.io/spring-xd/ > > > > > > >> >>> > > > [2] > > > > > > https://github.com/spring-projects/spring-integration-kafka > > > > > > >> >>> > > > [3] > > > > > > >> >>> > > > > > > > > > >> >>> > > > > > > > > >> >>> > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > > > > > > >> >>> > > > [4] > > > > > > >> >>> > > > > > > > > > >> >>> > > > > > > > > >> >>> > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI > > > > > > >> >>> > > > [5] > > > > > > >> > > > > http://kafka.apache.org/082/documentation.html#simpleconsumerapi > > > > > > >> >>> > > > [6] > > > > > > >> >>> > > > > > > > > > >> >>> > > > > > > > > >> >>> > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka > > > > > > >> >>> > > > [7] > https://issues.apache.org/jira/browse/KAFKA-1729 > > > > > > >> >>> > > > > > > > > >> >>> > > > > > > > > >> >>> > > > > > > >> >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > >