Did we create tickets?

On 11/15/19 9:32 AM, John Roesler wrote:
> Hi Navinder and Giridhar,
> 
> Thanks for the clarification. I think the motivation is reasonable.
> Although, I'm still a little unsure how this condition occurs. In
> addition to what I mentioned before, I'd forgotten one other important
> detail, that the "cleaner thread" should only run when the
> KafkaStreams state is RUNNING. That is, it shouldn't be able to
> "steal" a state directory while a rebalance is ongoing. Perhaps what
> is happening is that the cleaner checks the state before the
> rebalance, then the rebalance starts while the cleaner is iterating
> over the stores, so the task happens to unlock the directory just
> before the cleaner locks it?
> 
> Entry point, in KafkaStreams:
> 
> stateDirCleaner.scheduleAtFixedRate(() -> {
>     // we do not use lock here since we only read on the value and act on it
>     if (state == State.RUNNING) {
>         stateDirectory.cleanRemovedTasks(cleanupDelay);
>     }
> }, cleanupDelay, cleanupDelay, TimeUnit.MILLISECONDS);
> 
> Perhaps instead of checking the state of Streams _before_ the looping
> over the tasks, we should check it:
> 1. before the loop (short circuit)
> 2. before grabbing the lock (again, short circuit)
> 3. after grabbing the lock (this is the critical one, to ensure we
> never try to delete a directory during rebalance)
> 
> (3) guarantees correctness because tasks hold their lock the entire
> time they are running. By making sure that the state is still RUNNING
> after the cleaner acquires the lock, we can be sure that all tasks
> have been assigned and their locks are already all held before we
> proceed to delete anything.
> 
> ----
> 
> In addition to that apparent race condition, if a task can get stuck
> indefinitely in "Created", that would also be a bug. AFAIK, the
> StreamThread should just keep attempting to transition all tasks to
> running. I.e., it should never leave a task behind. The alternative is
> that a task could raise a fatal exception and kill the StreamThread
> itself, which should then cause the whole KafkaStreams instance to
> shut down.
> 
> I think that Giridhar has identified the problematic block, and
> actually, I see that this seems fixed as of 2.1:
> 
> if (active.allTasksRunning()) {
>     final Set<TopicPartition> assignment = consumer.assignment();
>     log.trace("Resuming partitions {}", assignment);
>     consumer.resume(assignment);
>     assignStandbyPartitions();
>     // in 1.1, this says "return true"
>     return standby.allTasksRunning();
> }
> return false;
> 
> Is there any chance you could upgrade at least to 2.1 and see what you think?
> 
> ----
> 
> Regarding the comment about spamming the logs, this seems to be where
> we're handling the lock exception and just deferring until the next
> attempt at initializing the task. If we printed the log every time
> this happens, it would print at a very high rate during the rebalance,
> while the task is contending with the cleaner. But, since we _don't_
> actually try again in 1.1, I guess this comment is incorrect. However,
> it becomes true in 2.1, where we _do_ retry in StreamThread.
> 
> ----
> 
> As for why the retries are removed, it _looks_ like they are removed
> in favor of retrying at a higher level. This makes sense, as
> busy-waiting on the lock itself would block further progress at the
> top level, potentially causing Streams to fail its heartbeat, fall out
> of the group, and cause another round of rebalances.
> 
> Clearly, though, the "retry at the top level" part isn't actually
> functioning in your version of Streams.
> 
> ---
> 
> So, in conclusion, yes, there seems to be a bug that could leave a
> standby task stuck in Created in 1.1, which was fixed in 2.1.
> 
> Additionally, there's another concurrency bug that we should fix to
> really prevent the cleaner from deleting stuff during rebalances.
> Since you're the ones to identify the bug, it's generally better for
> you to open the bug ticket.
> 
> Finally, in terms of a workaround, I recommend you increase the state
> cleaner delay so that it won't compete as much with tasks during
> rebalance.
> 
> Thanks so much to you both for identifying this and looking so deeply into it!
> -John
> 
> On Thu, Nov 14, 2019 at 11:48 PM Giridhar Addepalli
> <giridhar1...@gmail.com> wrote:
>>
>> Hi John,
>>
>> Can you please point us to code where Thread-2 will be able to recreate the 
>> state directory once cleaner is done ?
>>
>> Also, we see that in https://issues.apache.org/jira/browse/KAFKA-6122, 
>> retries around locks is removed. Please let us know why retry mechanism is 
>> removed?
>>
>> Also can you please explain below comment in 
>> AssignedTasks.java#initializeNewTasks function
>> catch (final LockException e) {
>>                     // made this trace as it will spam the logs in the poll 
>> loop.
>>                     log.trace("Could not create {} {} due to {}; will 
>> retry", taskTypeName, entry.getKey(), e.getMessage());
>>                 }
>> Thanks,
>> Giridhar.
>>
>> On 2019/11/14 21:25:28, John Roesler <j...@confluent.io> wrote:
>>> Hey Navinder,
>>>
>>> I think what's happening is a little different. Let's see if my
>>> worldview also explains your experiences.
>>>
>>> There is no such thing as "mark for deletion". When a thread loses a
>>> task, it simply releases its lock on the directory. If no one else on
>>> the instance claims that lock within `state.cleanup.delay.ms` amount
>>> of milliseconds, then the state cleaner will itself grab the lock and
>>> delete the directory. On the other hand, if another thread (or the
>>> same thread) gets the task back and claims the lock before the
>>> cleaner, it will be able to re-open the store and use it.
>>>
>>> The default for `state.cleanup.delay.ms` is 10 minutes, which is
>>> actually short enough that it could pass during a single rebalance (if
>>> Streams starts recovering a lot of state). I recommend you increase
>>> `state.cleanup.delay.ms` by a lot, like maybe set it to one hour.
>>>
>>> One thing I'm curious about... You didn't mention if Thread-2
>>> eventually is able to re-create the state directory (after the cleaner
>>> is done) and transition to RUNNING. This should be the case. If not, I
>>> would consider it a bug.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Thu, Nov 14, 2019 at 3:02 PM Navinder Brar
>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>
>>>> Hi,
>>>> We are facing a peculiar situation in the 2.3 version of Kafka Streams. 
>>>> First of all, I want to clarify if it is possible that a Stream Thread 
>>>> (say Stream Thread-1) which had got an assignment for a standby task (say 
>>>> 0_0) can change to Stream Thread-2 on the same host post rebalancing. The 
>>>> issue we are facing is this is happening for us and post rebalancing since 
>>>> the Stream Thread-1 had 0_0 and is not assigned back to it, it closes that 
>>>> task and marks it for deletion(after cleanup delay time), and meanwhile, 
>>>> the task gets assigned to Stream Thread-2. When the Stream Thread-2 tries 
>>>> to transition this task to Running, it gets a LockException which is 
>>>> caught in AssignedTasks#initializeNewTasks(). This makes 0_0 stay in 
>>>> Created state on Stream Thread-2 and after the cleanup delay is over the 
>>>> task directories for 0_0 get deleted.
>>>> Can someone please comment on this behavior.
>>>> Thanks,Navinder
>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to