Did we create tickets? On 11/15/19 9:32 AM, John Roesler wrote: > Hi Navinder and Giridhar, > > Thanks for the clarification. I think the motivation is reasonable. > Although, I'm still a little unsure how this condition occurs. In > addition to what I mentioned before, I'd forgotten one other important > detail, that the "cleaner thread" should only run when the > KafkaStreams state is RUNNING. That is, it shouldn't be able to > "steal" a state directory while a rebalance is ongoing. Perhaps what > is happening is that the cleaner checks the state before the > rebalance, then the rebalance starts while the cleaner is iterating > over the stores, so the task happens to unlock the directory just > before the cleaner locks it? > > Entry point, in KafkaStreams: > > stateDirCleaner.scheduleAtFixedRate(() -> { > // we do not use lock here since we only read on the value and act on it > if (state == State.RUNNING) { > stateDirectory.cleanRemovedTasks(cleanupDelay); > } > }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS); > > Perhaps instead of checking the state of Streams _before_ the looping > over the tasks, we should check it: > 1. before the loop (short circuit) > 2. before grabbing the lock (again, short circuit) > 3. after grabbing the lock (this is the critical one, to ensure we > never try to delete a directory during rebalance) > > (3) guarantees correctness because tasks hold their lock the entire > time they are running. By making sure that the state is still RUNNING > after the cleaner acquires the lock, we can be sure that all tasks > have been assigned and their locks are already all held before we > proceed to delete anything. > > ---- > > In addition to that apparent race condition, if a task can get stuck > indefinitely in "Created", that would also be a bug. AFAIK, the > StreamThread should just keep attempting to transition all tasks to > running. I.e., it should never leave a task behind. The alternative is > that a task could raise a fatal exception and kill the StreamThread > itself, which should then cause the whole KafkaStreams instance to > shut down. > > I think that Giridhar has identified the problematic block, and > actually, I see that this seems fixed as of 2.1: > > if (active.allTasksRunning()) { > final Set<TopicPartition> assignment = consumer.assignment(); > log.trace("Resuming partitions {}", assignment); > consumer.resume(assignment); > assignStandbyPartitions(); > // in 1.1, this says "return true" > return standby.allTasksRunning(); > } > return false; > > Is there any chance you could upgrade at least to 2.1 and see what you think? > > ---- > > Regarding the comment about spamming the logs, this seems to be where > we're handling the lock exception and just deferring until the next > attempt at initializing the task. If we printed the log every time > this happens, it would print at a very high rate during the rebalance, > while the task is contending with the cleaner. But, since we _don't_ > actually try again in 1.1, I guess this comment is incorrect. However, > it becomes true in 2.1, where we _do_ retry in StreamThread. > > ---- > > As for why the retries are removed, it _looks_ like they are removed > in favor of retrying at a higher level. This makes sense, as > busy-waiting on the lock itself would block further progress at the > top level, potentially causing Streams to fail its heartbeat, fall out > of the group, and cause another round of rebalances. > > Clearly, though, the "retry at the top level" part isn't actually > functioning in your version of Streams. > > --- > > So, in conclusion, yes, there seems to be a bug that could leave a > standby task stuck in Created in 1.1, which was fixed in 2.1. > > Additionally, there's another concurrency bug that we should fix to > really prevent the cleaner from deleting stuff during rebalances. > Since you're the ones to identify the bug, it's generally better for > you to open the bug ticket. > > Finally, in terms of a workaround, I recommend you increase the state > cleaner delay so that it won't compete as much with tasks during > rebalance. > > Thanks so much to you both for identifying this and looking so deeply into it! > -John > > On Thu, Nov 14, 2019 at 11:48 PM Giridhar Addepalli > <giridhar1...@gmail.com> wrote: >> >> Hi John, >> >> Can you please point us to code where Thread-2 will be able to recreate the >> state directory once cleaner is done ? >> >> Also, we see that in https://issues.apache.org/jira/browse/KAFKA-6122, >> retries around locks is removed. Please let us know why retry mechanism is >> removed? >> >> Also can you please explain below comment in >> AssignedTasks.java#initializeNewTasks function >> catch (final LockException e) { >> // made this trace as it will spam the logs in the poll >> loop. >> log.trace("Could not create {} {} due to {}; will >> retry", taskTypeName, entry.getKey(), e.getMessage()); >> } >> Thanks, >> Giridhar. >> >> On 2019/11/14 21:25:28, John Roesler <j...@confluent.io> wrote: >>> Hey Navinder, >>> >>> I think what's happening is a little different. Let's see if my >>> worldview also explains your experiences. >>> >>> There is no such thing as "mark for deletion". When a thread loses a >>> task, it simply releases its lock on the directory. If no one else on >>> the instance claims that lock within `state.cleanup.delay.ms` amount >>> of milliseconds, then the state cleaner will itself grab the lock and >>> delete the directory. On the other hand, if another thread (or the >>> same thread) gets the task back and claims the lock before the >>> cleaner, it will be able to re-open the store and use it. >>> >>> The default for `state.cleanup.delay.ms` is 10 minutes, which is >>> actually short enough that it could pass during a single rebalance (if >>> Streams starts recovering a lot of state). I recommend you increase >>> `state.cleanup.delay.ms` by a lot, like maybe set it to one hour. >>> >>> One thing I'm curious about... You didn't mention if Thread-2 >>> eventually is able to re-create the state directory (after the cleaner >>> is done) and transition to RUNNING. This should be the case. If not, I >>> would consider it a bug. >>> >>> Thanks, >>> -John >>> >>> On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar >>> <navinder_b...@yahoo.com.invalid> wrote: >>>> >>>> Hi, >>>> We are facing a peculiar situation in the 2.3 version of Kafka Streams. >>>> First of all, I want to clarify if it is possible that a Stream Thread >>>> (say Stream Thread-1) which had got an assignment for a standby task (say >>>> 0_0) can change to Stream Thread-2 on the same host post rebalancing. The >>>> issue we are facing is this is happening for us and post rebalancing since >>>> the Stream Thread-1 had 0_0 and is not assigned back to it, it closes that >>>> task and marks it for deletion(after cleanup delay time), and meanwhile, >>>> the task gets assigned to Stream Thread-2. When the Stream Thread-2 tries >>>> to transition this task to Running, it gets a LockException which is >>>> caught in AssignedTasks#initializeNewTasks(). This makes 0_0 stay in >>>> Created state on Stream Thread-2 and after the cleanup delay is over the >>>> task directories for 0_0 get deleted. >>>> Can someone please comment on this behavior. >>>> Thanks,Navinder >>>
signature.asc
Description: OpenPGP digital signature