Hi Matthew, I may be doing something wrong.
I cloned the code at https://github.com/apache/kafka/tree/trunk/contrib/hadoop-consumer I am running following : - ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties which generates a /tmp/kafka/data/1.dat file containing Dump tcp://localhost:9092 atlas-topic1 0 -1 to /tmp/kafka/data/1.dat - ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties It says Using offset range [0, 42649] Connected to node tcp://localhost:9092 beginning reading at offset 0 latest offset=42649 Again I run again - ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties It says Using offset range [0, 42759] Connected to node tcp://localhost:9092 beginning reading at offset 0 latest offset=42759 My test.properties contain local file system for input/output to test kafka.etl.topic=topic1 hdfs.default.classpath.dir=/tmp/kafka/lib event.count=1000 hadoop.job.ugi=kafka,hadoop kafka.server.uri=tcp://localhost:9092 input=/tmp/kafka/data output=/tmp/kafka/output kafka.request.limit=-1 client.buffer.size=1048576 client.so.timeout=60000 Pratyush On Fri, Dec 28, 2012 at 8:02 PM, Matthew Rathbone <matt...@foursquare.com>wrote: > So the hadoop consumer does use the latest offset, it reads it from the > 'input' directory in the record reader. > > We have a heavily modified version of the hadoop consumer that reads / > writes offsets to zookeeper [much like the scala consumers] and this works > great. > > FWIW we also use the hadoop consumer to write to S3 without any issues, > much like any ordinary mapreduce job, and it's pretty solid. We run our job > every 10-30 minutes. > > Maybe also interesting is that we used to use Flume [0.9], and find the > kafka method of consuming to be much better during s3 networking issues. > With flume if you 'push' to s3, but something goes wrong it can fall over > and you can fairly easily lose data, with the hadoop kafka consumer the > mapper just fails-over and tries again, which is a little wasteful (you're > reading the records twice), but generally great. > > > > On Fri, Dec 28, 2012 at 1:56 PM, Pratyush Chandra < > chandra.praty...@gmail.com> wrote: > > > I went through the source code of Hadoop consumer in contrib. It doesn't > > seem to be using previous offset at all. Neither in Data Generator or in > > Map reduce stage. > > > > Before I go into the implementation, I can think of 2 ways : > > 1. A consumerconnector receiving all the messages continuously, and then > > writing it to HDFS (in this case S3). Problem is autocommit is handled > > internally, and there is no handler function while committing offset, > which > > can be used to upload file. > > 2. Wake up every one minute, pull all the data using simple consumer > into a > > local file and put to HDFS. > > > > So, what is better approach ? > > - Listen continuously vs in batch > > - Use consumerconnector (where auto commit/offsets are handled > internally) > > vs simple consumer (which doesnot use zk, so I need to connect to each > > broker individually) > > > > Pratyush > > > > On Thu, Dec 27, 2012 at 8:38 PM, David Arthur <mum...@gmail.com> wrote: > > > > > I don't think anything exists like this in Kafka (or contrib), but it > > > would be a useful addition! Personally, I have written this exact thing > > at > > > previous jobs. > > > > > > As for the Hadoop consumer, since there is a FileSystem implementation > > for > > > S3 in Hadoop, it should be possible. The Hadoop consumer works by > writing > > > out data files containing the Kafka messages along side offset files > > which > > > contain the last offset read for each partition. If it is re-consuming > > from > > > zero each time you run it, it means it's not finding the offset files > > from > > > the previous run. > > > > > > Having used it a bit, the Hadoop consumer is certainly an area that > could > > > use improvement. > > > > > > HTH, > > > David > > > > > > > > > On 12/27/12 4:41 AM, Pratyush Chandra wrote: > > > > > >> Hi, > > >> > > >> I am looking for a S3 based consumer, which can write all the received > > >> events to S3 bucket (say every minute). Something similar to Flume > > >> HDFSSink > > >> http://flume.apache.org/**FlumeUserGuide.html#hdfs-sink< > > http://flume.apache.org/FlumeUserGuide.html#hdfs-sink> > > >> I have tried evaluating hadoop-consumer in contrib folder. But it > seems > > to > > >> be more for offline processing, which will fetch everything from > offset > > 0 > > >> at once and replace it in S3 bucket. > > >> Any help would be appreciated ? > > >> > > >> > > > > > > > > > -- > > Pratyush Chandra > > > > > > -- > Matthew Rathbone > Foursquare | Software Engineer | Server Engineering Team > matt...@foursquare.com | @rathboma <http://twitter.com/rathboma> | > 4sq<http://foursquare.com/rathboma> > -- Pratyush Chandra