Thanks for your feedback Jun.

Here are my changes to the KIP:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763&selectedPageVersions=21&selectedPageVersions=20

My comments are below...

On Wed, Aug 5, 2020 at 1:59 PM Jun Rao <j...@confluent.io> wrote:
>
> Hi, Jose,
>
> Thanks for the KIP. A few comments blow.
>
> 10. I agree with Jason that it's useful to document the motivation a bit
> clearer. Regarding semantic/performance, one benefit of snapshotting is
> that it allows changes to be encoded incrementally instead of using the
> full post image. For example, in KIP-631, each partition has multiple
> fields like assigned replicas, leader, epoch, isr, etc. If only isr is
> changed, the snapshotting approach allows the change to be represented with
> just the new value in isr. Compaction will require all existing fields to
> be included in order to represent just an isr change. This is just because
> we can customize the combining logic with snapshotting.
>

Yes. Right now the IsrChange record from KIP-631 has both the ISR and
the leader and epoch. I think we can split this record into two
records:
1. ISR change that includes the topic, partition, and isr.
2. Leader change that includes the topic, partition, leader and leader epoch.
I'll bring this up in the discussion thread for that KIP.

> As for the
> performance benefit, I guess in theory snapshotting allows the snapshot to
> be updated in-place incrementally without having to read the full state in
> the snapshot. BTW, during compaction, we only read the cleaned data once
> instead of 3 times.
>

Doesn't compaction need to read the clean records to compare if the
key is in the map of keys to offset? I made the following changes to
the KIP:

2. With log compaction the broker needs to
  a. read 1MB/s from the head of the log to update the in-memory state
  b. read 1MB/s to update the map of keys to offsets
  c. read 3MB/s (100MB from the already compacted log, 50MB from the
new key-value records) from the older segments. The log will
accumulate 50MB in 50 seconds worth of changes before compacting
because the default configuration has a minimum clean ratio of 50%.

The "100MB in the already compacted log" are these cleaned records.
Let me know what you think and if I am missing something.

> 11. The KIP mentions topic id. Currently there is no topic id. Does this
> KIP depend on KIP-516?
>

For the purpose of measuring the impact, I was using the records
proposed by KIP-631.This KIP doesn't depend on KIP-516 or KIIP-631 on
its design and implementation. I was just referencing that KIP in the
motivation and analysis. The KIP only assumes the changes in KIP-595
which has been approved but it are not part of trunk yet.

In the overview section the KIP mentions: "This KIP assumes that
KIP-595 has been approved and implemented. "

> 12. Is there a need to keep more than 1 snapshot? It seems we always expose
> the latest snapshot to clients.
>

The KIP proposes keeping more than one snapshot to not invalidate any
pending/concurrent `FetchSnapshot` that are attempting to fetch a
snapshot that can be deleted. I'll remove this wording as the first
version of this implementation will probably won't have this feature
as it requires extra coordination. The implementation will still allow
for multiple snapshots because generating a snapshot is not atomic
with respect to increasing the LBO.


> 13. "During leader election, followers with incomplete or missing snapshot
> will send a vote request and response as if they had an empty log." Hmm, a
> follower may not have a snapshot created, but that doesn't imply its log is
> empty.
>

Yes. I fixed the "Validation of Snapshot and Log"  and that sentence.
I basically added an additional condition where a snapshot is not
required if the LBO is 0.

> 14. "LBO is max.replication.lag.ms old." Not sure that I follow. How do we
> compare an offset to a time?
>

Yeah. This may be hard to implement. I am trying to avoid invalidating
followers and observers by aggressively deleting an offset/record
which they are trying to fetch. It is possible that
`controller.snapshot.minimum.records` is good enough to throttle
increasing LBO.

> 15. "Followers and observers will increase their log begin offset to the
> value sent on the fetch response as long as the local state machine has
> generated a snapshot that includes to the log begin offset minus one." Does
> the observer store the log? I thought it only needed to maintain a
> snapshot. If so, the observer doesn't need to maintain LBO.
>

In KIP-595 observers are similar to voters/followers in that they
Fetch the log and the snapshot. Two of the distinctions are that they
don't participate in the leader election and they are not included
when computing the high-watermark. Regarding storing: it is possible
that observers never need to store the log since they don't vote or
become leaders. I think in the future we would like to implement
"KIP-642: Dynamic quorum reassignment" which would add the capability
to upgrade an observer to a voter. In that case we do want to store
the log.


> 16. "There are two cases when the Kafka Controller needs to load the
> snapshot: When it is booting. When the follower and observer replicas
> finishes fetching a new snapshot from the leader." For faster failover, it
> seems it's useful for a non-controller voter to maintain the in-memory
> metadata state. In order to do that, it seems that every voter needs to
> load the snapshot on booting?
>

Yeah. I think there is some confusion on terminology. This document
refers to Kafka Controller as the component in every voter that reads
the snapshot and log. The Active Controller is the leader from that
set of voters (Kafka Controller). I added a terminology section in the
Overview section.

> 17. "There is an invariant that every log must have at least one snapshot
> where the offset of the snapshot is between LogBeginOffset - 1 and
> High-Watermark. If this is not the case then the replica should assume that
> the log is empty." Does that invariant hold when there is no snapshot
> initially?
>

You are correct that this invariant doesn't hold for a replica that
has a LBO of 0. I fixed this section to make this additional case
clear.

> 18. "The __cluster_metadata topic will have an cleanup.policy value of
> snapshot" Is there a need to make this configurable if it's read-only?
>

Hmm. Every topic has a default cleanup.policy of delete. Looking at
the code we also required that at least one of them is set. We need a
way to disable the existing delete and compaction clean up policy. In
LogConfig.scala we have:

```
...
.define(CleanupPolicyProp, LIST, Defaults.CleanupPolicy,
ValidList.in(LogConfig.Compact, LogConfig.Delete), MEDIUM, CompactDoc,
KafkaConfig.LogCleanupPolicyProp)
...
```

> 19. OFFSET_OUT_OF_RANGE in FetchSnapshotResponse: It seems that
> POSITION_OUT_OF_RANGE is more appropriate?
>

Are you suggesting we introduce a new error type? For
OFFSET_OUT_OF_RANGE we have

OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range
of offsets maintained by the server.",
OffsetOutOfRangeException::new),

Are you arguing that this is not entirely correct and instead the
request offset is encoded/included in a snapshot?

-- 
-Jose

Reply via email to