-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18022/
-----------------------------------------------------------

(Updated Feb. 12, 2014, 7:50 p.m.)


Review request for kafka.


Bugs: KAFKA-1012
    https://issues.apache.org/jira/browse/KAFKA-1012


Repository: kafka


Description
-------

I picked up most of Tejas' patch and made various edits for review here as I
would like this to be completed and closed.

Here is a link to the original implementation wiki:
https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management

A lot of it is the same in this revision of the patch, but there is a bunch
of refactoring. This patch does not use an "embedded producer" in the
consumer. i.e., the consumer issues offset commit/fetch requests directly to
the broker. Also, I decided against doing any kind of request forwarding and
added a "ConsumerMetadataRequest" that will be used to determine the offset
coordinator (and subsequently group coordinator that may be useful for the
client rewrite - see
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design).
Also, there were some questions on how to support multiple offset manager
implementations cleanly. After thinking about it I think it makes the code
simpler and clearer if we just have one good offset storage mechanism (i.e.,
Kafka-based). Consumers that want to store offsets elsewhere can do so on
their own. (However, if we do want to do this somewhat cleanly, see the
discussion on separation of APIs below.)

Here is a quick recap of how offset management within Kafka works:
- A special __offsets topic holds consumer offsets.
- The consumer group serves as the partition key for offsets committed to
  the __offsets topic. i.e., all offsets for all topics that a group
  consumes will be in a single partition of the offsets topic.
- The "group-topic-partition" is the actual (stored) key in each message of
  the offsets topic.  This facilitates de-duplication (and thus removal) of
  older offsets.
- The offset manager also contains an in-memory cache of offsets so it can
  serve offset fetch requests quickly.
- Think of commits as a little more than a produce request. If and only if
  the commit is appended to the __offsets log as a regular produce request
  we update the offsets cache. So the semantics are identical to a produce
  request.  Offset fetches return whatever is in the cache. If it is absent,
  and offsets have not yet been loaded from the logs into the cache (on
  becoming a leader), then we return an "OffsetsLoading" error code.

(Tejas' wiki has pretty good diagrams that describe the above.)

Some more details:

- Atomicity per-commit: One drawback of the Zookeeper-based offset commits
  is that we when we commit multiple offsets (since we don't use
  multi-write) we have to write offsets serially so it is not atomic.  In
  this implementation I went with Jun's suggestion on using a compressed
  message set. This ensures that we will disallow partial commits of a bulk
  commit. I have hard-coded this to GZIP but maybe it is better to just
  expose a config. Another option is to introduce an identity compression
  codec.
- The main corner cases to consider are when there is leader movement due to
  broker failures and simultaneous offset commits/fetches. Offset fetches
  would only occur if there are consumer-side rebalances or shutdowns. The
  guarantees we want to provide are: (i) successfully acknowledged offset
  commits should be returned on the next offset fetch - i.e., should not be
  lost (ii) offset fetches should never return a stale offset.
  - On becoming a follower of an offsets topic partition:
    - Partition.makeFollower clears the offset cache of entries belonging to
      this partition of __offsets.
    - Any subsequent offset fetch request will find out that the partition
      is no longer a leader and fail. There is one problem in the existing
      patch which I will highlight in the RB along with a suggested fix.
    - Likewise, any subsequent offset commit request will fail (since the
      underlying producer request will fail). It is okay if the underlying
      producer request succeeds and the broker becomes a follower for that
      partition just before the offset cache is updated (since the broker
      will not serve any OffsetFetchRequests for that partition until it
      becomes a leader again).
  - On becoming a leader of an offsets topic partition:
    - Partition.makeLeader: will load the offsets from the log
      (asynchronously). While this is in progress, the broker rejects offset
      fetches to this partition. Offset commits may continue to arrive -
      i.e., will be appended to the log and then written to the cache. The
      load loop might actually overwrite it with an earlier offset from the
      log but that is okay - since it will eventually reach the more recent
      update in the log and load that into the cache.

Migrating from ZooKeeper-based offset storage to Kafka-based offset storage:
- The broker config should set offsets.backup.enabled=true
- Upgrade the brokers to the latest jar. (Consumers still commit
  directly to ZooKeeper).
- Start migrating the consumers over.
- Consumers will now start sending offset commits to the broker. Since the
  backup setting is enabled, offsets will also be committed to ZooKeeper.
  This is necessary when migrating consumers.
- After _all_ consumers have moved over you can turn off the backup.

I have made a number of preliminary comments as TODOs in the RB myself (i.e.,
as a note to myself and others reviewing).

Questions/comments for discussion
- Should we explicitly disallow changes to the number of offset topic 
partitions?
  This is necessary (or at least prompt with a warning) since changing the 
number
  of partitions would affect the partitioning strategy.
- Should we remove per-partition error codes for offset commits and use just
  a global error code for the entire request? I'm using compressed message
  sets for commits.  i.e., the log append for a given commit will either
  fail entirely or succeed entirely. The OffsetCommitResponse contains
  per-partition error codes. So if the log append fails for any reason the
  same error code would apply for all partitions. i.e., it is sufficient to
  have a global error code. I think we currently have per-partition error
  codes due to the fact that offset commit requests can include metadata for
  each offset. The per-partition error code is set to MetadataTooLarge if
  the metadata entry exceeds the MaxMetadataLength. However, in this case I
  would prefer to just fail the entire request as opposed to doing partial
  commits (as I am in the current patch). Anyone have thoughts on this?
- Error codes: right now I'm using existing error codes (with the exception
  of OffsetsLoading). It may be better to return more specific error codes
  but I'm not sure if it matters - since the client-side implementation
  needs to check for _any_ error and if any error exists (other than
  MetadataTooLarge) just retry the offset commit/fetch until it succeeds.
  i.e., the client should not really care about the actual error. If people
  have any strong preference on this let me know.
- Separation of APIs: Right now, the offset manager, replica manager are
  intertwined which is less than ideal. It is okay if offset manager depends
  on replica manager but not the other way around. Ideally, I would like to
  have KafkaApis hand off offset commit/fetch requests to the offset manager
  which then handles it. However, the inter-dependence comes about due to
  the need to clear out the offset cache on becoming a follower and the need
  to load offsets on becoming a leader. I think we can improve the
  separation as follows:
  - Don't optimistically load offsets/clear offsets on a leader/follower
    transition. Instead, load offsets only when an offset fetch request
    arrives for a partition that had not been loaded yet.
  - The OffsetManager will need to maintain a Map[partition ->
    lastKnownLeaderEpoch] to determine whether to load offsets or not.
  - The above will eliminate the reference to OffsetManager from
    ReplicaManager. KafkaApis still needs to reference the OffsetManager and
    will need to create the offset commit message to append to the __offsets
    log.
  - We can actually avoid the need for KafkaApis to know about offset commit
    messsages as well: in order to do that, we will need to create a
    "DurableLog" layer on top of LogManager and move all the purgatory stuff
    in there. The LogManager supports appends/reads from the local log, but
    does not know anything about the replicas. Instead, we can have a
    DurableLog layer that depends on ReplicaManager and LogManager and
    contains the Producer/Fetch-Request purgatories. So OffsetManager will
    need to depend on this DurableLog component. So KafkaApis can just hand
    off ProducerRequests, FetchRequests to the DurableLog layer directly. It
    will hand off OffsetCommit/OffsetFetch requests to the OffsetManager
    which will then hand it off to the DurableLog layer.
  - Is the above worth it? I'm not sure it is, especially if we are sticking
    to only one offset management implementation.


Diffs
-----

  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala PRE-CREATION 
  core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala PRE-CREATION 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 4d1fa5c 
  core/src/main/scala/kafka/api/OffsetCommitResponse.scala 9e1795f 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 7036532 
  core/src/main/scala/kafka/api/RequestKeys.scala c81214f 
  core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b 
  core/src/main/scala/kafka/cluster/Partition.scala 1087a2e 
  core/src/main/scala/kafka/common/ErrorMapping.scala b0b5dce 
  core/src/main/scala/kafka/common/OffsetLoadInProgressException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 59608a3 
  core/src/main/scala/kafka/consumer/ConsoleConsumer.scala dc066c2 
  core/src/main/scala/kafka/consumer/ConsumerConfig.scala e6875d6 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 6dae149 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 703b2e2 
  core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 57b9d2a 
  core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala 570bf31 
  core/src/main/scala/kafka/server/KafkaApis.scala ae2df20 
  core/src/main/scala/kafka/server/KafkaConfig.scala 3c3aafc 
  core/src/main/scala/kafka/server/KafkaServer.scala 5e34f95 
  core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaManager.scala 21bba48 
  core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 33d7c2c 
  core/src/main/scala/kafka/tools/MirrorMaker.scala f0f871c 
  core/src/main/scala/kafka/tools/TestOffsetManager.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/Utils.scala a89b046 
  core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala 31534ca 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
eb274d1 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 6a96d80 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 1317b4c 
  sbt 944ebf8 
  system_test/mirror_maker/README da53c14 
  system_test/mirror_maker/bin/expected.out 0a1bbaf 
  system_test/mirror_maker/bin/run-test.sh e5e6c08 
  system_test/mirror_maker/config/blacklisttest.consumer.properties ff12015 
  system_test/mirror_maker/config/mirror_producer.properties aa8be65 
  system_test/mirror_maker/config/server_source_1_1.properties 2f070a7 
  system_test/mirror_maker/config/server_source_1_2.properties f9353e8 
  system_test/mirror_maker/config/server_source_2_1.properties daa01ad 
  system_test/mirror_maker/config/server_source_2_2.properties be6fdfc 
  system_test/mirror_maker/config/server_target_1_1.properties d37955a 
  system_test/mirror_maker/config/server_target_1_2.properties aa7546c 
  system_test/mirror_maker/config/whitelisttest_1.consumer.properties ff12015 
  system_test/mirror_maker/config/whitelisttest_2.consumer.properties f1a902b 
  system_test/mirror_maker/config/zookeeper_source_1.properties f851796 
  system_test/mirror_maker/config/zookeeper_source_2.properties d534d18 
  system_test/mirror_maker/config/zookeeper_target.properties 55a7eb1 
  system_test/offset_management_testsuite/cluster_config.json PRE-CREATION 
  system_test/offset_management_testsuite/config/console_consumer.properties 
PRE-CREATION 
  system_test/offset_management_testsuite/config/server.properties PRE-CREATION 
  system_test/offset_management_testsuite/config/zookeeper.properties 
PRE-CREATION 
  system_test/offset_management_testsuite/offset_management_test.py 
PRE-CREATION 
  
system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json
 PRE-CREATION 
  
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
 PRE-CREATION 
  
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
 PRE-CREATION 
  
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
 PRE-CREATION 
  
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
 PRE-CREATION 
  
system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties
 PRE-CREATION 
  
system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json
 PRE-CREATION 
  system_test/testcase_to_run.json 8252860 
  system_test/utils/kafka_system_test_utils.py fb4a9c0 
  system_test/utils/testcase_env.py bee8716 

Diff: https://reviews.apache.org/r/18022/diff/


Testing
-------


Thanks,

Joel Koshy

Reply via email to