You are right,it gives the count of mesages stored in all topics from the
time the Broker service(Kafka) is started and thats why its actually no of
msg request coming to each broker.So whenever a broker service is
restarted,message count is set to zero.
Also this is the reason,msg count doesn't ge
sorry to keep bugging the list, but I feel like I am either missing
something important, or I'm finding something wrong w/ the standard
consumer api, (or maybe just the docs need some clarification).
I started to think that I should probably just accept at least once
semantics ... but I eventually
Hello Imran,
The offset will only be updated when the next() function is called:
override def next(): MessageAndMetadata[K, V] = {
...
*currentTopicInfo.resetConsumeOffset(consumedOffset)*
...
item
}
instead of in makeNext(), which will just update consumedOffset, but that
is n
On Thu, Nov 21, 2013 at 09:08:14PM +0530, Monika Garg wrote:
> Yes I have sent more than 1000 messages to stock-tick and 5 messages to
> each topic1 and sample-tick topics.
>
> But now I think I got the reason why the O/P at two places is different:
>
> The Mbeans were registered when I started
yes, that makes sense, thank you very much! phew, I was worried there ...
so, is it fair to say that
1) if you use auto-commit, it is possible for a msg to get completely
skipped by your app on restarts? (worst case, one per partition)
2) if you are committing offsets from some thread other tha
Yes I have sent more than 1000 messages to stock-tick and 5 messages to
each topic1 and sample-tick topics.
But now I think I got the reason why the O/P at two places is different:
The Mbeans were registered when I started producer and gave messages to
topics.After producer was killed,still Mbean
Hi Tarang,
Could you check if there are any exceptions in the consumer logs when it
does not consume?
Guozhang
On Thu, Nov 21, 2013 at 5:17 AM, Tarang Dawer wrote:
> Hello
>
> I am running a kafka 0.8 consumer with the following configuration : -
>
> fetch.size=10
> zookeeper.session.t
Thanks Bruno
that fixed it..!!
On Thu, Oct 17, 2013 at 6:47 PM, Bruno D. Rodrigues <
bruno.rodrig...@litux.org> wrote:
> try this, not sure if it would help though
>
> props.put("auto.offset.reset", "smallest");
>
> A 17/10/2013, às 14:13, Tarang Dawer escreveu:
>
> Hi All
>
> I am f
Hello
I am running a kafka 0.8 consumer with the following configuration : -
fetch.size=10
zookeeper.session.timeout.ms=6
auto.offset.reset=smallest
zookeeper.sync.time.ms=200
auto.commit.enable=false
i am doing manual increment of the offsets.
while doing so, i am facing a problem
When it doesn't consume which offset is in the fetch request? You can find
out from the request log in the broker.
Thanks,
Jun
On Thu, Nov 21, 2013 at 5:17 AM, Tarang Dawer wrote:
> Hello
>
> I am running a kafka 0.8 consumer with the following configuration : -
>
> fetch.size=10
> zoo
You likely need to use a custom offset solution if you plan on committing
every message. With many partitions this puts a large burden on Zookeeper,
you end up needing to roll over your zk transaction logs fast as well or
rist filling up disk
On Thu, Nov 21, 2013 at 6:20 PM, Guozhang Wang wrote:
Hi team,
We have 3 brokers in a cluster. The replication factor is 2. I set the default
retention size to
3G bytes. I published 12G data to a topic, which is enough to fully load all
partitions. I assume
on each broker the partition size should be 3G. However, it is only 1.4G for
one partation.
sorry, one more thought --
I've realized that the difficulties are also because I'm trying to
guarantee read *exactly* once. The standard consumer group guarantees
read *at least* once (with more than one read happening for those
messages that get read, but then the process dies before the offset
Hi,
thanks again for the quick answer on this. However, though this
solution works, it is *really* complicated to get the user code
correct if you are reading data with more than 1 thread. Before I
begin processing one batch of records, I have to make sure all of the
workers reading from kafka s
1) Yes. And worst case is that you may lose 1 message, but you may have
more than one duplicates per partition since the commit thread will call
CommitOffset periodically.
2). I think locking is also fine, since it will just lift the thread
waiting on the lock, which is better than spin-lock since
Hi Edward,
I think you misunderstand ... I definitely do *not* want to commit
every message. That would be much too expensive, every few minutes is
plenty for me.
I want to guarantee that if I commit a message, that my message has
been processed by *my application* at least once. (In another th
why not just disable autocommit and only call commit offsets() after you've
processed a batch? it isn't obvious to me how doing so would allow a
message to be processed zero times.
On Nov 21, 2013 5:52 PM, "Imran Rashid" wrote:
> Hi Edward,
>
> I think you misunderstand ... I definitely do *not*
HI Benjamin,
great question, I thought that for a while too. I think that won't
work, in my use case, but I could certainly use somebody else checking
my logic.
I think that would work, *if* I was willing to allow arbitrary time to
pass between my commits. Then I could call commitOffsets within
Are you on 0.8 branch or trunk? Also, what are the sizes for the other
partitions? Also how many producers do you have? It could be because
the producer sticks to a partition for the metadata refresh period -
so if your test run isn't long enough some partitions may be more
loaded than the others.
Hi Imran,
The thing to do, is not have an asynchronous background thread for
committing. Instead have a time based "shouldCommit()" function, and
commit periodically, synchronously.
If you have a bursty producer, then you can set a timeout (
consumer.timeout.ms), so that your iter.hasNext() call
Imran,
Remember too, that different threads will always be processing a different
set of partitions. No 2 threads will ever own the same partition,
simultaneously.
A consumer connector can own many partitions (split among its threads),
each with a different offset. So, yes, it is complicated, a
Also, did you enable compression on the producer side?
Thanks,
Jun
On Thu, Nov 21, 2013 at 10:28 AM, Yu, Libo wrote:
> Hi team,
>
> We have 3 brokers in a cluster. The replication factor is 2. I set the
> default retention size to
> 3G bytes. I published 12G data to a topic, which is enough t
Hi Jason,
thank you so much!
I was missing the consumer.timeout.ms property. In fact I had
actually written a full solution using cyclic barriers to make sure
all my readers had stopped consuming ... but threw it out b/c I
thought I couldn't deal w/ iterators blocked in hasNext(). That
propert
Hi Jun
I tried the ConsumerOffsetChecker script for checking the offsets
but there also , got an exception , with no information regarding the
offset. Although when i start the consumer for the second time, it consumed
the data, but still same response from the offset checker script.
Please find
Hi Guozhang
Didn't get any exceptions in the consumer log if the consumer doesn't
consume.
in zookeeper logs got info messages like -
[2013-11-22 11:30:04,607] INFO Got user-level KeeperException when
processing sessionid:0x1427e5fdbcc type:create cxid:0x92
zxid:0xfffe txntype:unk
Again,
Remember that if you have multiple threads processing, then it means they
are consuming from different partitions, so the offset of one partition
will not be relevant to another. So, you really wouldn't have a use case
to be committing specific offsets async. Furthermore, if your consumer
26 matches
Mail list logo