----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18022/ -----------------------------------------------------------
(Updated March 5, 2014, 11:53 p.m.) Review request for kafka. Changes ------- I believe I have addressed/incorporated all of the comments from the previous review in this updated patch. Let me know if I have missed anything. A couple of additional comments: - I will file follow-up issues for these: - Fix commitOffsets in SimpleConsumer (not sure if SimpleConsumer needs a commit offsets though). It may be better to have a generic "OffsetsCommitter" to do commits/fetches. - LogCleaner does not work with compressed topics: - First, the iteration over compressed message sets is incorrect. The position is advanced by the size of the decompressed messages within each compressed message-set. - Second, it only writes out uncompressed messages. We should probably have a cleaner-config that specifies the compression codec to use for the compacted log. - I should probably clear offsets cache on StopReplica (say, during partition reassignment) but I have not looked carefully enough to be sure and it is non-critical. - Import/export zk offsets/updateoffsetsinzk needs to be updated - preferably with an uber-consumer-admin tool. - Given that the log cleaner does not work with compressed topics I have set the default for the offsets topic to no-compression. - Since this is a new feature, I'm making consumers commit to zookeeper by default. - I used the new protocol schema utilities for versioning the on-disk messages of the offsets topic. Those classes should probably move to a more general place since it is useful for more than just protocol schemas. - I disallowed changes to the partition count of offsets topic, but maybe we should expose a --force option. If anyone has thoughts on this, let me know. I also outlined possible approaches to changing the partition count on the fly (see Neha's review). - Here's what I ended up doing for offset commit/fetch error codes in the consumer connector (there was a shutdown issue in the previous patch): - If offset commit fails while rebalancing or start-up, then retry indefinitely (since we need availability during normal operation). - If offset commit fails during shutdown, then retry only up to max retries. - If offset fetch fails during shutdown, abort and fail the rebalance. - In my tests, offset loads are taking longer than I expect - I'm investigating that separately. Bugs: KAFKA-1012 https://issues.apache.org/jira/browse/KAFKA-1012 Repository: kafka Description ------- I picked up most of Tejas' patch and made various edits for review here as I would like this to be completed and closed. Here is a link to the original implementation wiki: https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management A lot of it is the same in this revision of the patch, but there is a bunch of refactoring. This patch does not use an "embedded producer" in the consumer. i.e., the consumer issues offset commit/fetch requests directly to the broker. Also, I decided against doing any kind of request forwarding and added a "ConsumerMetadataRequest" that will be used to determine the offset coordinator (and subsequently group coordinator that may be useful for the client rewrite - see https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design). Also, there were some questions on how to support multiple offset manager implementations cleanly. After thinking about it I think it makes the code simpler and clearer if we just have one good offset storage mechanism (i.e., Kafka-based). Consumers that want to store offsets elsewhere can do so on their own. (However, if we do want to do this somewhat cleanly, see the discussion on separation of APIs below.) Here is a quick recap of how offset management within Kafka works: - A special __offsets topic holds consumer offsets. - The consumer group serves as the partition key for offsets committed to the __offsets topic. i.e., all offsets for all topics that a group consumes will be in a single partition of the offsets topic. - The "group-topic-partition" is the actual (stored) key in each message of the offsets topic. This facilitates de-duplication (and thus removal) of older offsets. - The offset manager also contains an in-memory cache of offsets so it can serve offset fetch requests quickly. - Think of commits as a little more than a produce request. If and only if the commit is appended to the __offsets log as a regular produce request we update the offsets cache. So the semantics are identical to a produce request. Offset fetches return whatever is in the cache. If it is absent, and offsets have not yet been loaded from the logs into the cache (on becoming a leader), then we return an "OffsetsLoading" error code. (Tejas' wiki has pretty good diagrams that describe the above.) Some more details: - Atomicity per-commit: One drawback of the Zookeeper-based offset commits is that we when we commit multiple offsets (since we don't use multi-write) we have to write offsets serially so it is not atomic. In this implementation I went with Jun's suggestion on using a compressed message set. This ensures that we will disallow partial commits of a bulk commit. I have hard-coded this to GZIP but maybe it is better to just expose a config. Another option is to introduce an identity compression codec. - The main corner cases to consider are when there is leader movement due to broker failures and simultaneous offset commits/fetches. Offset fetches would only occur if there are consumer-side rebalances or shutdowns. The guarantees we want to provide are: (i) successfully acknowledged offset commits should be returned on the next offset fetch - i.e., should not be lost (ii) offset fetches should never return a stale offset. - On becoming a follower of an offsets topic partition: - Partition.makeFollower clears the offset cache of entries belonging to this partition of __offsets. - Any subsequent offset fetch request will find out that the partition is no longer a leader and fail. There is one problem in the existing patch which I will highlight in the RB along with a suggested fix. - Likewise, any subsequent offset commit request will fail (since the underlying producer request will fail). It is okay if the underlying producer request succeeds and the broker becomes a follower for that partition just before the offset cache is updated (since the broker will not serve any OffsetFetchRequests for that partition until it becomes a leader again). - On becoming a leader of an offsets topic partition: - Partition.makeLeader: will load the offsets from the log (asynchronously). While this is in progress, the broker rejects offset fetches to this partition. Offset commits may continue to arrive - i.e., will be appended to the log and then written to the cache. The load loop might actually overwrite it with an earlier offset from the log but that is okay - since it will eventually reach the more recent update in the log and load that into the cache. Migrating from ZooKeeper-based offset storage to Kafka-based offset storage: - The broker config should set offsets.backup.enabled=true - Upgrade the brokers to the latest jar. (Consumers still commit directly to ZooKeeper). - Start migrating the consumers over. - Consumers will now start sending offset commits to the broker. Since the backup setting is enabled, offsets will also be committed to ZooKeeper. This is necessary when migrating consumers. - After _all_ consumers have moved over you can turn off the backup. I have made a number of preliminary comments as TODOs in the RB myself (i.e., as a note to myself and others reviewing). Questions/comments for discussion - Should we explicitly disallow changes to the number of offset topic partitions? This is necessary (or at least prompt with a warning) since changing the number of partitions would affect the partitioning strategy. - Should we remove per-partition error codes for offset commits and use just a global error code for the entire request? I'm using compressed message sets for commits. i.e., the log append for a given commit will either fail entirely or succeed entirely. The OffsetCommitResponse contains per-partition error codes. So if the log append fails for any reason the same error code would apply for all partitions. i.e., it is sufficient to have a global error code. I think we currently have per-partition error codes due to the fact that offset commit requests can include metadata for each offset. The per-partition error code is set to MetadataTooLarge if the metadata entry exceeds the MaxMetadataLength. However, in this case I would prefer to just fail the entire request as opposed to doing partial commits (as I am in the current patch). Anyone have thoughts on this? - Error codes: right now I'm using existing error codes (with the exception of OffsetsLoading). It may be better to return more specific error codes but I'm not sure if it matters - since the client-side implementation needs to check for _any_ error and if any error exists (other than MetadataTooLarge) just retry the offset commit/fetch until it succeeds. i.e., the client should not really care about the actual error. If people have any strong preference on this let me know. - Separation of APIs: Right now, the offset manager, replica manager are intertwined which is less than ideal. It is okay if offset manager depends on replica manager but not the other way around. Ideally, I would like to have KafkaApis hand off offset commit/fetch requests to the offset manager which then handles it. However, the inter-dependence comes about due to the need to clear out the offset cache on becoming a follower and the need to load offsets on becoming a leader. I think we can improve the separation as follows: - Don't optimistically load offsets/clear offsets on a leader/follower transition. Instead, load offsets only when an offset fetch request arrives for a partition that had not been loaded yet. - The OffsetManager will need to maintain a Map[partition -> lastKnownLeaderEpoch] to determine whether to load offsets or not. - The above will eliminate the reference to OffsetManager from ReplicaManager. KafkaApis still needs to reference the OffsetManager and will need to create the offset commit message to append to the __offsets log. - We can actually avoid the need for KafkaApis to know about offset commit messsages as well: in order to do that, we will need to create a "DurableLog" layer on top of LogManager and move all the purgatory stuff in there. The LogManager supports appends/reads from the local log, but does not know anything about the replicas. Instead, we can have a DurableLog layer that depends on ReplicaManager and LogManager and contains the Producer/Fetch-Request purgatories. So OffsetManager will need to depend on this DurableLog component. So KafkaApis can just hand off ProducerRequests, FetchRequests to the DurableLog layer directly. It will hand off OffsetCommit/OffsetFetch requests to the OffsetManager which will then hand it off to the DurableLog layer. - Is the above worth it? I'm not sure it is, especially if we are sticking to only one offset management implementation. Diffs (updated) ----- config/server.properties c9e923a core/src/main/scala/kafka/admin/TopicCommand.scala dc9b092 core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala PRE-CREATION core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala PRE-CREATION core/src/main/scala/kafka/api/OffsetCommitRequest.scala 4d1fa5c core/src/main/scala/kafka/api/OffsetCommitResponse.scala 9e1795f core/src/main/scala/kafka/api/OffsetFetchRequest.scala 7036532 core/src/main/scala/kafka/api/RequestKeys.scala c81214f core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b core/src/main/scala/kafka/cluster/Partition.scala 882b6da core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala b0b5dce core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala PRE-CREATION core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 59608a3 core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala PRE-CREATION core/src/main/scala/kafka/common/Topic.scala c1b9f65 core/src/main/scala/kafka/consumer/ConsoleConsumer.scala dc066c2 core/src/main/scala/kafka/consumer/ConsumerConfig.scala e6875d6 core/src/main/scala/kafka/consumer/ConsumerConnector.scala 13c3f77 core/src/main/scala/kafka/consumer/SimpleConsumer.scala 098d6e4 core/src/main/scala/kafka/consumer/TopicCount.scala e332633 core/src/main/scala/kafka/consumer/TopicFilter.scala 4f20823 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 703b2e2 core/src/main/scala/kafka/controller/KafkaController.scala 4deff9d core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala PRE-CREATION core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 57b9d2a core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala 570bf31 core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java c45c803 core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 58e83f6 core/src/main/scala/kafka/producer/ProducerConfig.scala 7947b18 core/src/main/scala/kafka/server/KafkaApis.scala 215ac36 core/src/main/scala/kafka/server/KafkaConfig.scala b871843 core/src/main/scala/kafka/server/KafkaServer.scala feb2093 core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION core/src/main/scala/kafka/server/ReplicaManager.scala fb759d9 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 33d7c2c core/src/main/scala/kafka/tools/DumpLogSegments.scala 14f44d9 core/src/main/scala/kafka/tools/MirrorMaker.scala f0f871c core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 5e8c56d core/src/main/scala/kafka/tools/TestOffsetManager.scala PRE-CREATION core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala eac9af2 core/src/main/scala/kafka/utils/Utils.scala a89b046 core/src/main/scala/kafka/utils/VerifiableProperties.scala b070bb4 core/src/main/scala/kafka/utils/ZkUtils.scala a198628 core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala 31534ca core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala dbe078c core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala eb274d1 core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala cf2724b core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 8fe7259 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 6a96d80 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 1317b4c system_test/mirror_maker/README da53c14 system_test/mirror_maker/bin/expected.out 0a1bbaf system_test/mirror_maker/bin/run-test.sh e5e6c08 system_test/mirror_maker/config/blacklisttest.consumer.properties ff12015 system_test/mirror_maker/config/mirror_producer.properties aa8be65 system_test/mirror_maker/config/server_source_1_1.properties 2f070a7 system_test/mirror_maker/config/server_source_1_2.properties f9353e8 system_test/mirror_maker/config/server_source_2_1.properties daa01ad system_test/mirror_maker/config/server_source_2_2.properties be6fdfc system_test/mirror_maker/config/server_target_1_1.properties d37955a system_test/mirror_maker/config/server_target_1_2.properties aa7546c system_test/mirror_maker/config/whitelisttest_1.consumer.properties ff12015 system_test/mirror_maker/config/whitelisttest_2.consumer.properties f1a902b system_test/mirror_maker/config/zookeeper_source_1.properties f851796 system_test/mirror_maker/config/zookeeper_source_2.properties d534d18 system_test/mirror_maker/config/zookeeper_target.properties 55a7eb1 system_test/offset_management_testsuite/cluster_config.json PRE-CREATION system_test/offset_management_testsuite/config/console_consumer.properties PRE-CREATION system_test/offset_management_testsuite/config/server.properties PRE-CREATION system_test/offset_management_testsuite/config/zookeeper.properties PRE-CREATION system_test/offset_management_testsuite/offset_management_test.py PRE-CREATION system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json PRE-CREATION system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties PRE-CREATION system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties PRE-CREATION system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties PRE-CREATION system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties PRE-CREATION system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties PRE-CREATION system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json PRE-CREATION system_test/utils/kafka_system_test_utils.py 5d2b7df system_test/utils/testcase_env.py bee8716 Diff: https://reviews.apache.org/r/18022/diff/ Testing ------- Thanks, Joel Koshy