Jay, we have observed CRC corruption too occasionally. I reported in an thread and asked how should we handle some error conditions from old high-level consumer.
On Mon, Feb 9, 2015 at 11:36 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > Hi Jay, > > 1) Sorry to get back to you so late. It is CRC check error on any consumer > thread regardless of the server. What happens is I have to catch this > exception is skip the message now. There is no option to re-fetch this > message. Is there any way to add behavior in Java consumer to re-fetch > this offset CRC failed offset. > > > 2) Secondly, can you please add default behavior to auto set > 'fetch.message.max.bytes' = broker's message.max.bytes. This will ensure > smooth configuration for both simple and high level consumer. This will > take burden away from Kafka user to config this property. We had lag issue > due to this mis configuration and drop messages on Camus side and (camus > has different setting for simple consumer). It would be great to auto > config this if user did not supply this in configuration. > > Let me know if you agree with #2. > > Thanks, > > Bhavesh > > On Mon, Jan 12, 2015 at 9:25 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Hey Bhavesh, > > > > This seems like a serious issue and not one anyone else has reported. I > > don't know what you mean by corrupt message, are you saying the CRC check > > fails? If so, that check is done both by the broker (prior to appending > to > > the log) and the consumer so that implies either a bug in the broker or > > else disk corruption on the server. > > > > I do have an option to disable the CRC check in the consumer, though > > depending on the nature of the corruption that can just lead to more > > serious errors (depending on what is corrupted). > > > > -jay > > > > On Sun, Jan 11, 2015 at 11:00 PM, Bhavesh Mistry < > > mistry.p.bhav...@gmail.com > > > wrote: > > > > > Hi Jay, > > > > > > One of the pain point of existing consumer code is CORRUPT_MESSAGE > > > occasionally. Right now, it is hard to pin-point the problem of > > > CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is > > there > > > any proposal to auto skip corrupted message and have reporting > visibility > > > of CRC error(metics etc or traceability to find corruption).per topic > > etc ? > > > I am not sure if this is correct email thread to address this if not > > please > > > let me know. > > > > > > Will provide feedback about new consumer api and changes. > > > Thanks, > > > > > > Bhavesh > > > > > > On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > I uploaded an updated version of the new consumer client ( > > > > https://issues.apache.org/jira/browse/KAFKA-1760). This is now > almost > > > > feature complete, and has pretty reasonable testing and metrics. I > > think > > > it > > > > is ready for review and could be checked in once 0.8.2 is out. > > > > > > > > For those who haven't been following this is meant to be a new > consumer > > > > client, like the new producer is 0.8.2, and intended to replace the > > > > existing "high level" and "simple" scala consumers. > > > > > > > > This still needs the server-side implementation of the partition > > > assignment > > > > and group management to be fully functional. I have just stubbed this > > out > > > > in the server to allow the implementation and testing of the server > but > > > > actual usage will require it. However the client that exists now is > > > > actually a fully functional replacement for the "simple consumer" > that > > is > > > > vastly easier to use correctly as it internally does all the > discovery > > > and > > > > failover. > > > > > > > > It would be great if people could take a look at this code, and > > > > particularly at the public apis which have several small changes from > > the > > > > original proposal. > > > > > > > > Summary > > > > > > > > What's there: > > > > 1. Simple consumer functionality > > > > 2. Offset commit and fetch > > > > 3. Ability to change position with seek > > > > 4. Ability to commit all or just some offsets > > > > 5. Controller discovery, failure detection, heartbeat, and fail-over > > > > 6. Controller partition assignment > > > > 7. Logging > > > > 8. Metrics > > > > 9. Integration tests including tests that simulate random broker > > failures > > > > 10. Integration into the consumer performance test > > > > > > > > Limitations: > > > > 1. There could be some lingering bugs in the group management > support, > > it > > > > is hard to fully test fully with just the stub support on the server, > > so > > > > we'll need to get the server working to do better I think. > > > > 2. I haven't implemented wild-card subscriptions yet. > > > > 3. No integration with console consumer yet > > > > > > > > Performance > > > > > > > > I did some performance comparison with the old consumer over > localhost > > on > > > > my laptop. Usually localhost isn't good for testing but in this case > it > > > is > > > > good because it has near infinite bandwidth so it does a good job at > > > > catching inefficiencies that would be hidden with a slower network. > > These > > > > numbers probably aren't representative of what you would get over a > > real > > > > network, but help bring out the relative efficiencies. > > > > Here are the results: > > > > - Old high-level consumer: 213 MB/sec > > > > - New consumer: 225 MB/sec > > > > - Old simple consumer: 242 Mb/sec > > > > > > > > It may be hard to get this client up to the same point as the simple > > > > consumer as it is doing very little beyond allocating and wrapping > byte > > > > buffers that it reads off the network. > > > > > > > > The big thing that shows up in profiling is the buffer allocation for > > > > reading data. So one speed-up would be to pool these. > > > > > > > > Some things to discuss > > > > > > > > 1. What should the behavior of consumer.position() and > > > consumer.committed() > > > > be immediately after initialization (prior to calling poll). > Currently > > > > these methods just fetch the current value from memory, but if the > > > position > > > > isn't in memory it will try to fetch it from the server, if no > position > > > is > > > > found it will use the auto-offset reset policy to pick on. I think > this > > > is > > > > the right thing to do because you can't guarantee how many calls to > > > poll() > > > > will be required before full initialization would be complete > > otherwise. > > > > But it is kind of weird. > > > > 2. Overall code structure improvement. These NIO network clients tend > > to > > > be > > > > very imperative in nature. I'm not sure this is bad, but if anyone > has > > > any > > > > idea on improving the code I'd love to hear it. > > > > > > > > -Jay > > > > > > > > > >