Hi all,
I have a question about a very suspicious behavior I see during
consuming messages using manual synchronous commit with Kafka 0.10.1.0.
The code looks something like this:
try (KafkaConsumer<...> consumer = ...) {
Map commitMap =
Collections.synchronizedMap(...);
while (!Thread.curre
Hi Ismael,
short answer at the end of this email. :)
On 02/02/2017 02:52 PM, Ismael Juma wrote:
I hadn't quite understood this. So the asynchronous code continues to
update the map, I see.
Regarding your suggested fix, the required change is not to hold a lock to
the map passed to `commitSync
Sorry, I replied a little too fast. It is true that my original suggestion
would not work and I agree that it would be better to copy the map in
`commitSync` (that's why I suggested it in my email). Please file a JIRA.
However, that doesn't solve your immediate issue. More inline.
On Thu, Feb 2, 2
I agree with Jan. KafkaConsumer should take a copy of the offsetMap, now
that there are multiple threads accessing the map. commitAsync already does
take a copy and when there was only one thread in the consumer, it was
reasonable to avoid cloning for commitSync. But now it makes sense to clone
for
I'd disagree that I can fix the issue as you suggest, because:
- if I remove the `Collections.synchronizedMap` from the `commitMap` I
get unsynchronized map and therefore the asynchronous writes to this map
would result in undefined state
- if I remove the manual synchronization then there
OK, you can fix this by removing `Collections.synchronizedMap` from the
following line or by removing the synchronized blocks.
Map commitMap =
Collections.synchronizedMap(...);
There is no reason to do manual and automatic synchronization at the same
time in this case. Because `Collections.syncho
Hi Ismael,
yes, no problem:
The following thread is the main thread interacting with the
KafkaConsumer (polling topic and committing offsets):
"pool-3-thread-1" #14 prio=5 os_prio=0 tid=0x7f00f4434800 nid=0x32a9
runnable [0x7f00b6662000]
java.lang.Thread.State: RUNNABLE
a
Hi Jan,
Do you have stacktraces showing the issue? That would help. Also, if you
can test 0.10.1.1, which is the latest stable release, that would be even
better. From looking at the code briefly, I don't see where the consumer is
locking on the received offsets map, so not sure what would cause i
Hi all,
I have a question about a very suspicious behavior I see during
consuming messages using manual synchronous commit with Kafka 0.10.1.0.
The code looks something like this:
try (KafkaConsumer<...> consumer = ...) {
Map commitMap =
Collections.synchronizedMap(...);
while (!Thread.c