[jira] [Work started] (KAFKA-3915) LogCleaner IO buffers do not account for potential size difference due to message format change
[ https://issues.apache.org/jira/browse/KAFKA-3915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-3915 started by Ismael Juma. -- > LogCleaner IO buffers do not account for potential size difference due to > message format change > --- > > Key: KAFKA-3915 > URL: https://issues.apache.org/jira/browse/KAFKA-3915 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0 >Reporter: Tommy Becker >Assignee: Ismael Juma >Priority: Blocker > Fix For: 0.10.0.1 > > > We are upgrading from Kafka 0.8.1 to 0.10.0.0 and discovered an issue after > getting the following exception from the log cleaner: > {code} > [2016-06-28 10:02:18,759] ERROR [kafka-log-cleaner-thread-0], Error due to > (kafka.log.LogCleaner) > java.nio.BufferOverflowException > at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206) > at > kafka.message.ByteBufferMessageSet$.writeMessage(ByteBufferMessageSet.scala:169) > at kafka.log.Cleaner$$anonfun$cleanInto$1.apply(LogCleaner.scala:435) > at kafka.log.Cleaner$$anonfun$cleanInto$1.apply(LogCleaner.scala:429) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:429) > at > kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:380) > at > kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:376) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:376) > at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:343) > at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:342) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.log.Cleaner.clean(LogCleaner.scala:342) > at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:237) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:215) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > {code} > At first this seems impossible because the input and output buffers are > identically sized. But in the case where the source messages are of an older > format, additional space may be required to write them out in the new one. > Since the message header is 8 bytes larger in 0.10.0, this failure can > happen. > We're planning to work around this by adding the following config: > {code}log.message.format.version=0.8.1{code} but this definitely needs a fix. > We could simply preserve the existing message format (since in this case we > can't retroactively add a timestamp anyway). Otherwise, the log cleaner would > have to be smarter about ensuring there is sufficient "slack space" in the > output buffer to account for the size difference * the number of messages in > the input buffer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-67: Queryable state for Kafka Streams
Hi, Jay's interpretation is correct. Thanks, Damian On Fri, 15 Jul 2016 at 16:10, Jay Kreps wrote: > My interpretation was that you need an implementation of > QueryableStoreType which anyone can do and QueryableStoreTypes is just a > place to put the type objects for the types we ship with Kafka. > > -Jay > > On Fri, Jul 15, 2016 at 4:04 PM, Sriram Subramanian > wrote: > > > So, it looks like QueryableStoreTypes would be part of the streams > library, > > right? If a developer needs to support a new store, would they need to > > change code in streams? > > > > On Fri, Jul 15, 2016 at 3:15 PM, Jay Kreps wrote: > > > > > Cool, I'm +1 after the updates. > > > > > > -Jay > > > > > > On Fri, Jul 15, 2016 at 1:50 PM, Damian Guy > > wrote: > > > > > > > Hi Guozhang, KIP updated. > > > > > > > > Thanks, > > > > Damian > > > > > > > > On Fri, 15 Jul 2016 at 13:15 Guozhang Wang > wrote: > > > > > > > > > Hi Damian, > > > > > > > > > > Since the StateStoreProvider is moved into internal packages, how > > about > > > > > just keeping the ReadOnlyXXStores interface for the queryAPI, and > > > > > "QueryableStoreType" in the discoverAPI, and move the > > > StateStoreProvider > > > > > / QueryableStoreTypeMatcher and different implementations of the > > > matcher > > > > > like KeyValueStoreType / etc in a new section called "developer > guide > > > for > > > > > customized stores"? This way we have a separate guidance for > Streams > > > > users, > > > > > from Streams developers. > > > > > > > > > > Other than that, all LGTM. > > > > > > > > > > Guozhang > > > > > > > > > > On Fri, Jul 15, 2016 at 9:57 AM, Damian Guy > > > > wrote: > > > > > > > > > > > Hi All, > > > > > > > > > > > > I've updated the KIP with changes as discussed in this Thread. > > > > > > > > > > > > Thanks, > > > > > > Damian > > > > > > > > > > > > On Wed, 13 Jul 2016 at 16:51 Ismael Juma > > wrote: > > > > > > > > > > > > > I think it's important to distinguish the use cases of defining > > new > > > > > > stores > > > > > > > (somewhat rare) versus using the `store` method (very common). > > The > > > > > > strategy > > > > > > > employed here is a common way to use generics to ensure type > > safety > > > > for > > > > > > the > > > > > > > latter case. In the former case, there are all sorts of weird > > > things > > > > > one > > > > > > > could do to defeat the type system, but spending a bit more > > effort > > > to > > > > > get > > > > > > > it right so that the common case is safer and more pleasant is > > > worth > > > > > it, > > > > > > in > > > > > > > my opinion. > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > On Thu, Jul 14, 2016 at 12:23 AM, Damian Guy < > > damian@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Yes, you get compile time errors > > > > > > > > > > > > > > > > On Wed, 13 Jul 2016 at 16:22 Damian Guy < > damian@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > You wont get a runtime error as you wouldn't find a store > of > > > that > > > > > > type. > > > > > > > > > The API would return null > > > > > > > > > > > > > > > > > > On Wed, 13 Jul 2016 at 16:22 Jay Kreps > > > wrote: > > > > > > > > > > > > > > > > > >> But if "my-store" is not of type MyStoreType don't you > still > > > > get a > > > > > > run > > > > > > > > >> time > > > > > > > > >> error that in effect is the same as the class cast would > be? > > > > > > Basically > > > > > > > > the > > > > > > > > >> question I'm asking is whether this added complexity is > > > actually > > > > > > > moving > > > > > > > > >> runtime errors to compile time errors. > > > > > > > > >> > > > > > > > > >> -Jay > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > >
Re: [VOTE] KIP-67: Queryable state for Kafka Streams
Hi, The vote is now complete and KIP-67 has been accepted and adopted. Thanks everyone for the input etc. Regards, Damian On Sat, 16 Jul 2016 at 06:53 Damian Guy wrote: > Hi, > Jay's interpretation is correct. > Thanks, > Damian > > > On Fri, 15 Jul 2016 at 16:10, Jay Kreps wrote: > >> My interpretation was that you need an implementation of >> QueryableStoreType which anyone can do and QueryableStoreTypes is just >> a >> place to put the type objects for the types we ship with Kafka. >> >> -Jay >> >> On Fri, Jul 15, 2016 at 4:04 PM, Sriram Subramanian >> wrote: >> >> > So, it looks like QueryableStoreTypes would be part of the streams >> library, >> > right? If a developer needs to support a new store, would they need to >> > change code in streams? >> > >> > On Fri, Jul 15, 2016 at 3:15 PM, Jay Kreps wrote: >> > >> > > Cool, I'm +1 after the updates. >> > > >> > > -Jay >> > > >> > > On Fri, Jul 15, 2016 at 1:50 PM, Damian Guy >> > wrote: >> > > >> > > > Hi Guozhang, KIP updated. >> > > > >> > > > Thanks, >> > > > Damian >> > > > >> > > > On Fri, 15 Jul 2016 at 13:15 Guozhang Wang >> wrote: >> > > > >> > > > > Hi Damian, >> > > > > >> > > > > Since the StateStoreProvider is moved into internal packages, how >> > about >> > > > > just keeping the ReadOnlyXXStores interface for the queryAPI, and >> > > > > "QueryableStoreType" in the discoverAPI, and move the >> > > StateStoreProvider >> > > > > / QueryableStoreTypeMatcher and different implementations of the >> > > matcher >> > > > > like KeyValueStoreType / etc in a new section called "developer >> guide >> > > for >> > > > > customized stores"? This way we have a separate guidance for >> Streams >> > > > users, >> > > > > from Streams developers. >> > > > > >> > > > > Other than that, all LGTM. >> > > > > >> > > > > Guozhang >> > > > > >> > > > > On Fri, Jul 15, 2016 at 9:57 AM, Damian Guy > > >> > > > wrote: >> > > > > >> > > > > > Hi All, >> > > > > > >> > > > > > I've updated the KIP with changes as discussed in this Thread. >> > > > > > >> > > > > > Thanks, >> > > > > > Damian >> > > > > > >> > > > > > On Wed, 13 Jul 2016 at 16:51 Ismael Juma >> > wrote: >> > > > > > >> > > > > > > I think it's important to distinguish the use cases of >> defining >> > new >> > > > > > stores >> > > > > > > (somewhat rare) versus using the `store` method (very common). >> > The >> > > > > > strategy >> > > > > > > employed here is a common way to use generics to ensure type >> > safety >> > > > for >> > > > > > the >> > > > > > > latter case. In the former case, there are all sorts of weird >> > > things >> > > > > one >> > > > > > > could do to defeat the type system, but spending a bit more >> > effort >> > > to >> > > > > get >> > > > > > > it right so that the common case is safer and more pleasant is >> > > worth >> > > > > it, >> > > > > > in >> > > > > > > my opinion. >> > > > > > > >> > > > > > > Ismael >> > > > > > > >> > > > > > > On Thu, Jul 14, 2016 at 12:23 AM, Damian Guy < >> > damian@gmail.com >> > > > >> > > > > > wrote: >> > > > > > > >> > > > > > > > Yes, you get compile time errors >> > > > > > > > >> > > > > > > > On Wed, 13 Jul 2016 at 16:22 Damian Guy < >> damian@gmail.com> >> > > > > wrote: >> > > > > > > > >> > > > > > > > > You wont get a runtime error as you wouldn't find a store >> of >> > > that >> > > > > > type. >> > > > > > > > > The API would return null >> > > > > > > > > >> > > > > > > > > On Wed, 13 Jul 2016 at 16:22 Jay Kreps >> > > wrote: >> > > > > > > > > >> > > > > > > > >> But if "my-store" is not of type MyStoreType don't you >> still >> > > > get a >> > > > > > run >> > > > > > > > >> time >> > > > > > > > >> error that in effect is the same as the class cast would >> be? >> > > > > > Basically >> > > > > > > > the >> > > > > > > > >> question I'm asking is whether this added complexity is >> > > actually >> > > > > > > moving >> > > > > > > > >> runtime errors to compile time errors. >> > > > > > > > >> >> > > > > > > > >> -Jay >> > > > > > > > >> >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > -- Guozhang >> > > > > >> > > > >> > > >> > >> >
Re: [DISCUSS] KIP-66 Kafka Connect Transformers for messages
Gwen, Yup, that sounds great! Instead of keeping it up to the transformers to handle null, we can instead have the topic as null. Sounds good. To get rid of a message, set the topic to a special one (could be as simple as null). Like I said before, the more interesting part would be ‘adding’ a new message to the existing list, based on say the current message in the transformer. Does that feature warrant to be included? > On Jul 14, 2016, at 22:25, Gwen Shapira wrote: > > I used to work on Apache Flume, where we used to allow users to filter > messages completely in the transformation and then we got rid of it, > because we spent too much time trying to help users who had "message > loss", where the loss was actually a bug in the filter... > > What we couldn't do in Flume, but perhaps can do in the simple > transform for Connect is the ability to route messages to different > topics, with "null" as one of the possible targets. This will allow > you to implement a dead-letter-queue functionality and redirect > messages that don't pass filter to an "errors" topic without getting > rid of them completely, while also allowing braver users to get rid of > messages by directing them to "null". > > Does that make sense? > > Gwen > > On Thu, Jul 14, 2016 at 8:33 PM, Nisarg Shah wrote: >> Thank you for your inputs Gwen and Michael. >> >> The original reason why I suggested a set based processing is because of the >> flexibility is provides. The JIRA had a comment by a user requesting a >> feature that could be achieved with this. >> >> After reading Gwen and Michael's points, (I went through the documentation >> and the code in detail) and agree with what you have to say. Also, fewer >> guarantees make what I had in mind less certain and thus simplifying it to a >> single message based transformation would ensure that users who do require >> more flexibility with the transformations will automatically “turn to" Kafka >> Streams. The transformation logic on a message by message basis makes more >> sense. >> >> One usecase that Kafka Connect could consider is adding or removing a >> message completely. (This was trivially possible with the collection >> passing). Should users be pointed towards Kafka Streams even for this use >> case? I think this is a very useful feature for Connect too, and I’ll try to >> rethink on the API too. >> >> Removing a message is as easy as returning a null and having the next >> transformer skip it, but adding messages would involve say a queue between >> transformers and a method which says “pass message” to the next, which can >> be called multiple times from one “transform” function; a variation on the >> chain of responsibility design pattern. >> >>> On Jul 12, 2016, at 12:54 AM, Michael Noll wrote: >>> >>> As Gwen said, my initial thought is that message transformations that are >>> "more than trivial" should rather be done by Kafka Streams, rather than by >>> Kafka Connect (for the reasons that Gwen mentioned). >>> >>> Transforming one message at a time would be a good fit for Kafka Connect. >>> An important use case is to remove sensitive data (such as PII) from an >>> incoming data stream before it hits Kafka's persistent storage -- this use >>> case can't be implemented well with Kafka Streams because, by design, Kafka >>> Streams is meant to read its input data from Kafka (i.e. at the point when >>> Kafka Streams could be used to removed sensitive data fields the data is >>> already stored persistently in Kafka, and this might be a no-go depending >>> on the use case). >>> >>> I'm of course interested to hear what other people think. >>> >>> >>> On Tue, Jul 12, 2016 at 6:06 AM, Gwen Shapira wrote: >>> I think we need to restrict the functionality to one-message-at-a-time. Basically, connect gives very little guarantees about the size of the set of the composition (you may get same messages over and over, mix of old and new, etc) In order to do useful things over a collection, you need better defined semantics of what's included. Kafka Streams is putting tons of effort into having good windowing semantics, and I think apps that require modification of collections are a better fit there. I'm willing to change my mind though (have been known to happen) - what are the comments about usage that point toward the collections approach? Gwen On Mon, Jul 11, 2016 at 3:32 PM, Nisarg Shah wrote: > Thanks Jay, added that to the KIP. > > Besides reviewing the KIP as a whole, I wanted to know about what everyone > thinks about what data should be dealt at the Transformer level. Transform > the whole Collection of Records (giving the flexibility of modifying > messages across the set) OR > Transform messages one at a time, iteratively. This will restrict > modifications across messages. > > I’ll get a
[jira] [Created] (KAFKA-3969) kafka.admin.ConsumerGroupCommand doesn't show consumer groups
Dieter Plaetinck created KAFKA-3969: --- Summary: kafka.admin.ConsumerGroupCommand doesn't show consumer groups Key: KAFKA-3969 URL: https://issues.apache.org/jira/browse/KAFKA-3969 Project: Kafka Issue Type: Bug Affects Versions: 0.10.0.0 Reporter: Dieter Plaetinck http://kafka.apache.org/documentation.html , at http://kafka.apache.org/documentation.html#basic_ops_consumer_lag says " Note, however, after 0.9.0, the kafka.tools.ConsumerOffsetChecker tool is deprecated and you should use the kafka.admin.ConsumerGroupCommand (or the bin/kafka-consumer-groups.sh script) to manage consumer groups, including consumers created with the new consumer API." I'm sure that i have a consumer running, because i wrote an app that is processing data, and i can see the data as well as the metrics that confirm it's receiving data. I'm using kafka 0.10 Yet when I run the command as instructed, it doesn't list any consumer groups $ /opt/kafka_2.11-0.10.0.0/bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --zookeeper localhost:2181 --list $ So either something is wrong with the tool, or with the docs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3961) broker sends malformed response when switching from no compression to snappy/gzip
[ https://issues.apache.org/jira/browse/KAFKA-3961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15380940#comment-15380940 ] Dieter Plaetinck commented on KAFKA-3961: - So I noticed I don't even have to switch from no compression to compression. I can just start fresh and send gzip/snappy data straight away to trigger the issue. If I use no compression it works fine. The output of the script looks good to me. Here's the output for all 3 cases (none, gzip and snappy) docker exec -t -i $(docker ps | grep raintank_kafka_1 | cut -d' ' -f1) /opt/kafka_2.11-0.10.0.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mdm-0/.log | head Dumping /tmp/kafka-logs/mdm-0/.log Starting offset: 0 offset: 0 position: 0 isvalid: true payloadsize: 179 magic: 1 compresscodec: NoCompressionCodec crc: 3280747185 keysize: 8 offset: 1 position: 221 isvalid: true payloadsize: 179 magic: 1 compresscodec: NoCompressionCodec crc: 3546883992 keysize: 8 offset: 2 position: 442 isvalid: true payloadsize: 179 magic: 1 compresscodec: NoCompressionCodec crc: 3572906062 keysize: 8 offset: 3 position: 663 isvalid: true payloadsize: 179 magic: 1 compresscodec: NoCompressionCodec crc: 397468137 keysize: 8 offset: 4 position: 884 isvalid: true payloadsize: 179 magic: 1 compresscodec: NoCompressionCodec crc: 2327265879 keysize: 8 offset: 5 position: 1105 isvalid: true payloadsize: 179 magic: 1 compresscodec: NoCompressionCodec crc: 693965344 keysize: 8 offset: 6 position: 1326 isvalid: true payloadsize: 179 magic: 1 compresscodec: NoCompressionCodec crc: 1844153547 keysize: 8 offset: 7 position: 1547 isvalid: true payloadsize: 179 magic: 1 compresscodec: NoCompressionCodec crc: 1888993996 keysize: 8 docker exec -t -i $(docker ps | grep raintank_kafka_1 | cut -d' ' -f1) /opt/kafka_2.11-0.10.0.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mdm-0/.log | head Dumping /tmp/kafka-logs/mdm-0/.log Starting offset: 0 offset: 1 position: 0 isvalid: true payloadsize: 292 magic: 1 compresscodec: SnappyCompressionCodec crc: 7281723 offset: 9 position: 326 isvalid: true payloadsize: 689 magic: 1 compresscodec: SnappyCompressionCodec crc: 3347985156 offset: 10 position: 1049 isvalid: true payloadsize: 214 magic: 1 compresscodec: SnappyCompressionCodec crc: 4000746891 offset: 19 position: 1297 isvalid: true payloadsize: 726 magic: 1 compresscodec: SnappyCompressionCodec crc: 1610492591 offset: 20 position: 2057 isvalid: true payloadsize: 214 magic: 1 compresscodec: SnappyCompressionCodec crc: 3804305430 offset: 29 position: 2305 isvalid: true payloadsize: 731 magic: 1 compresscodec: SnappyCompressionCodec crc: 595144250 offset: 30 position: 3070 isvalid: true payloadsize: 214 magic: 1 compresscodec: SnappyCompressionCodec crc: 4231320622 offset: 39 position: 3318 isvalid: true payloadsize: 731 magic: 1 compresscodec: SnappyCompressionCodec crc: 2800209123 docker exec -t -i $(docker ps | grep raintank_kafka_1 | cut -d' ' -f1) /opt/kafka_2.11-0.10.0.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mdm-0/.log | head Dumping /tmp/kafka-logs/mdm-0/.log Starting offset: 0 offset: 0 position: 0 isvalid: true payloadsize: 200 magic: 1 compresscodec: GZIPCompressionCodec crc: 314894844 offset: 9 position: 234 isvalid: true payloadsize: 612 magic: 1 compresscodec: GZIPCompressionCodec crc: 2990244273 offset: 11 position: 880 isvalid: true payloadsize: 260 magic: 1 compresscodec: GZIPCompressionCodec crc: 3785043074 offset: 19 position: 1174 isvalid: true payloadsize: 551 magic: 1 compresscodec: GZIPCompressionCodec crc: 2341216064 offset: 21 position: 1759 isvalid: true payloadsize: 261 magic: 1 compresscodec: GZIPCompressionCodec crc: 3850584842 offset: 29 position: 2054 isvalid: true payloadsize: 556 magic: 1 compresscodec: GZIPCompressionCodec crc: 2303391279 offset: 31 position: 2644 isvalid: true payloadsize: 260 magic: 1 compresscodec: GZIPCompressionCodec crc: 2729038381 offset: 39 position: 2938 isvalid: true payloadsize: 557 magic: 1 compresscodec: GZIPCompressionCodec crc: 1498839171 > broker sends malformed response when switching from no compression to > snappy/gzip > - > > Key: KAFKA-3961 > URL: https://issues.apache.org/jira/browse/KAFKA-3961 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.0.0 > Environment: docker container java:openjdk-8-jre on arch linux > 4.5.4-1-ARCH >Reporter: Dieter Plaetinck > > Hi this is my first time using this tracker, so please bear with me (priority > seems to be major by default?) > I should be allowed to switch back and forth betwe
[jira] [Updated] (KAFKA-3960) Committed offset not set after first assign
[ https://issues.apache.org/jira/browse/KAFKA-3960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Romanchuk updated KAFKA-3960: Description: Committed offset did not set after first assign. Here it is minimal example (scala): {code} val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("client.id", "client1") props.put("group.id", "client1") props.put("enable.auto.commit", "false") props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props) import scala.collection.JavaConversions._ def dumpPositionAndCommitted() = { consumer.assignment().foreach { tp => println(tp) println(s"Position - ${consumer.position(tp)}") println(s"Committed - ${consumer.committed(tp)}") } println("---") } consumer.assign(Collections.singleton(new TopicPartition("topic", 0))) dumpPositionAndCommitted() Thread.sleep(3000) val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ consumer.assignment() consumer.assign(ps) dumpPositionAndCommitted() Thread.sleep(3000) dumpPositionAndCommitted() {code} and the result is {noformat} Position - 1211046445 Committed - OffsetAndMetadata{offset=1211046445, metadata=''} --- topic-1 Position - 1262864347 Committed - null topic-0 Position - 1211046445 Committed - OffsetAndMetadata{offset=1211046445, metadata=''} --- topic-1 Position - 1262864347 Committed - null topic-0 Position - 1211046445 Committed - OffsetAndMetadata{offset=1211046445, metadata=''} --- {noformat} Pay attention to {noformat} topic-1 Position - 1262864347 Committed - null {noformat} There is no committed offset fetched from broker, but it is. Looks like we should set {{needsFetchCommittedOffsets}} to {{true}} during assign the partition was: Committed offset did not set after first assign. Here it is minimal example (scala): {code} val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("client.id", "client1") props.put("group.id", "client1") props.put("enable.auto.commit", "false") props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer") val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props) import scala.collection.JavaConversions._ def dumpPositionAndCommitted() = { consumer.assignment().foreach { tp => println(tp) println(s"Position - ${consumer.position(tp)}") println(s"Committed - ${consumer.committed(tp)}") } println("---") } consumer.assign(Collections.singleton(new TopicPartition("topic", 0))) dumpPositionAndCommitted() Thread.sleep(3000) val ps = Collections.singleton(new TopicPartition("topic", 1)) ++ consumer.assignment() consumer.assign(ps) dumpPositionAndCommitted() Thread.sleep(3000) dumpPositionAndCommitted() {code} and the result is {noformat} Position - 1211046445 Committed - OffsetAndMetadata{offset=1211046445, metadata=''} --- proto7_fraud-1 Position - 1262864347 Committed - null topic-0 Position - 1211046445 Committed - OffsetAndMetadata{offset=1211046445, metadata=''} --- topic-1 Position - 1262864347 Committed - null topic-0 Position - 1211046445 Committed - OffsetAndMetadata{offset=1211046445, metadata=''} --- {noformat} Pay attention to {noformat} topic-1 Position - 1262864347 Committed - null {noformat} There is no committed offset fetched from broker, but it is. Looks like we should set {{needsFetchCommittedOffsets}} to {{true}} during assign the partition > Committed offset not set after first assign > --- > > Key: KAFKA-3960 > URL: https://issues.apache.org/jira/browse/KAFKA-3960 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.0.0 >Reporter: Alexey Romanchuk >Priority: Blocker > > Committed offset did not set after first assign. Here it is minimal example > (scala): > {code} > val props = new Properties() > props.put("bootstrap.servers", "localhost:9092") > props.put("client.id", "client1") > props.put("group.id", "client1") > props.put("enable.auto.commit", "false") > props.put("key.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > props.put("value.deserializer", > "org.apache.kafka.common.serialization.ByteArrayDeserializer") > val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](props) > import scala.collection.JavaConversions._ > def dumpPositionAndCommitted() = { >