Hi, I've finally fixed this by closing the connection on timeout and creating a new connection on the next send.
Thanks, Gerrit On Tue, Jan 14, 2014 at 10:20 AM, Gerrit Jansen van Vuuren < gerrit...@gmail.com> wrote: > Hi, > > thanks I will do this. > > > > On Tue, Jan 14, 2014 at 9:51 AM, Joe Stein <joe.st...@stealth.ly> wrote: > >> I Gerrit, do you have a ticket already for this issue? Is it possible to >> attach code that reproduces it? Would be great if you can run it against >> a >> Kafka VM you can grab one from this project for 0.8.0 >> https://github.com/stealthly/scala-kafka to launch a Kafka VM and add >> whatever you need to it to reproduce the issue or from >> https://issues.apache.org/jira/browse/KAFKA-1173 for 0.8.1. I think if >> you >> can reproduce it in an environment comfortably that is in a controlled >> isolation that would be helpful for folks to reproduce and work towards >> resolution.... At least if it is a bug we can get a detailed capture of >> what the bug is in the JIRA ticket and start discussing how to fix it. >> >> /******************************************* >> Joe Stein >> Founder, Principal Consultant >> Big Data Open Source Security LLC >> http://www.stealth.ly >> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop> >> ********************************************/ >> >> >> On Tue, Jan 14, 2014 at 3:38 AM, Gerrit Jansen van Vuuren < >> gerrit...@gmail.com> wrote: >> >> > Yes, I'm using my own client following: >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol >> > >> > Everything works except for this weirdness. >> > >> > >> > On Tue, Jan 14, 2014 at 5:50 AM, Jun Rao <jun...@gmail.com> wrote: >> > >> > > So, you implemented your own consumer client using netty? >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > >> > > On Mon, Jan 13, 2014 at 8:42 AM, Gerrit Jansen van Vuuren < >> > > gerrit...@gmail.com> wrote: >> > > >> > > > I'm using netty and async write, read. >> > > > For read I used a timeout such that if I do not see anything on the >> > read >> > > > channel, my read function times out and returns null. >> > > > I do not see any error on the socket, and the same socket is used >> > > > throughout all of the fetches. >> > > > >> > > > I'm using the console producer and messages are "11", "22", "abc", >> > "iiii" >> > > > etc. >> > > > >> > > > I can reliably reproduce it every time. >> > > > >> > > > Its weird yes, no compression is used, the timeout happens for the >> same >> > > > scenario every time. >> > > > >> > > > >> > > > >> > > > On Mon, Jan 13, 2014 at 4:44 PM, Jun Rao <jun...@gmail.com> wrote: >> > > > >> > > > > I can't seen to find the log trace for the timed out fetch request >> > > (every >> > > > > fetch request seems to have a corresponding completed entry). For >> the >> > > > timed >> > > > > out fetch request, is it that the broker never completed the >> request >> > or >> > > > is >> > > > > it that it just took longer than the socket timeout to finish >> > > processing >> > > > > the request? Do you use large messages in your test? >> > > > > >> > > > > If you haven't enabled compression, it's weird that you will >> re-get >> > 240 >> > > > and >> > > > > 241 with an offset of 242 in the fetch request. Is that easily >> > > > > reproducible? >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jun >> > > > > >> > > > > >> > > > > On Mon, Jan 13, 2014 at 1:26 AM, Gerrit Jansen van Vuuren < >> > > > > gerrit...@gmail.com> wrote: >> > > > > >> > > > > > Hi, >> > > > > > >> > > > > > the offset in g is 240, and in i 242, the last message read was >> at >> > > > offset >> > > > > > 239. >> > > > > > >> > > > > > After reading from 0 - 239, I make another request for 240, this >> > > > request >> > > > > > timesout and never returns. >> > > > > > I then manually add 2 entries via the console producer, all the >> > time >> > > > > while >> > > > > > making a request for 240 every 10 seconds, all subsequent >> requests >> > > for >> > > > > > offset 240 returns empty messages, till the responses are >> written. >> > > > Then I >> > > > > > get the 2 messages at offsets 240,241 and an end of response. >> Then >> > I >> > > > > make a >> > > > > > request for offset 242, and get the messages at offsets 240,241 >> > > again. >> > > > > > >> > > > > > I've attached a portion of the kafka-request.log set to trace. >> > > > > > >> > > > > > The correlation ids are: >> > > > > > 1389604489 - first request at offset 0 >> > > > > > 1389604511 - timeout at offset 240 >> > > > > > 1389604563 - got data request at offset 240 >> > > > > > 1389604573 - got duplicates request at offset 242 >> > > > > > >> > > > > > Regards, >> > > > > > Gerrit >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Mon, Jan 13, 2014 at 5:10 AM, Jun Rao <jun...@gmail.com> >> wrote: >> > > > > > >> > > > > >> What's the offset used in the fetch request in steps g and i >> that >> > > both >> > > > > >> returned offsets 10 and 11? >> > > > > >> >> > > > > >> Thanks, >> > > > > >> >> > > > > >> Jun >> > > > > >> >> > > > > >> >> > > > > >> On Sat, Jan 11, 2014 at 3:19 AM, Gerrit Jansen van Vuuren < >> > > > > >> gerrit...@gmail.com> wrote: >> > > > > >> >> > > > > >> > Hi, >> > > > > >> > >> > > > > >> > >> > > > > >> > No the offsets are not the same. I've printed out the values >> to >> > > see >> > > > > >> this, >> > > > > >> > and its not the case. >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > On Fri, Jan 10, 2014 at 5:02 PM, Jun Rao <jun...@gmail.com> >> > > wrote: >> > > > > >> > >> > > > > >> > > Are the offset used in the 2 fetch requests the same? If >> so, >> > you >> > > > > will >> > > > > >> get >> > > > > >> > > the same messages twice. You consumer is responsible for >> > > advancing >> > > > > the >> > > > > >> > > offsets after consumption. >> > > > > >> > > >> > > > > >> > > Thanks, >> > > > > >> > > >> > > > > >> > > Jun >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > On Thu, Jan 9, 2014 at 1:00 PM, Gerrit Jansen van Vuuren < >> > > > > >> > > gerrit...@gmail.com> wrote: >> > > > > >> > > >> > > > > >> > > > Hi, >> > > > > >> > > > >> > > > > >> > > > I'm writing a custom consumer for kafka 0.8. >> > > > > >> > > > Everything works except for the following: >> > > > > >> > > > >> > > > > >> > > > a. connect, send fetch, read all results >> > > > > >> > > > b. send fetch >> > > > > >> > > > c. send fetch >> > > > > >> > > > d. send fetch >> > > > > >> > > > e. via the console publisher, publish 2 messages >> > > > > >> > > > f. send fetch :corr-id 1 >> > > > > >> > > > g. read 2 messages published :offsets [10 11] :corr-id 1 >> > > > > >> > > > h. send fetch :corr-id 2 >> > > > > >> > > > i. read 2 messages published :offsets [10 11] :corr-id 2 >> > > > > >> > > > j. send fetch ... >> > > > > >> > > > >> > > > > >> > > > The problem is I get the messages sent twice as a >> response >> > to >> > > > two >> > > > > >> > > separate >> > > > > >> > > > fetch requests. The correlation id is distinct so it >> cannot >> > be >> > > > > that >> > > > > >> I >> > > > > >> > > read >> > > > > >> > > > the response twice. The offsets of the 2 messages are are >> > the >> > > > same >> > > > > >> so >> > > > > >> > > they >> > > > > >> > > > are duplicates, and its not the producer sending the >> > messages >> > > > > twice. >> > > > > >> > > > >> > > > > >> > > > Note: the same connection is kept open the whole time, >> and I >> > > > send >> > > > > >> > > > block,receive then send again, after the first 2 messages >> > are >> > > > > read, >> > > > > >> the >> > > > > >> > > > offsets are incremented and the next fetch will ask >> kafka to >> > > > give >> > > > > it >> > > > > >> > > > messages from the new offsets. >> > > > > >> > > > >> > > > > >> > > > any ideas of why kafka would be sending the messages >> again >> > on >> > > > the >> > > > > >> > second >> > > > > >> > > > fetch request? >> > > > > >> > > > >> > > > > >> > > > Regards, >> > > > > >> > > > Gerrit >> > > > > >> > > > >> > > > > >> > > >> > > > > >> > >> > > > > >> >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >