> On April 29, 2015, 7:30 p.m., Yan Fang wrote: > > samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java, > > lines 60-68 > > <https://reviews.apache.org/r/32147/diff/7/?file=922539#file922539line60> > > > > actually, we do not need the consumer after boostrapping, right? Do we > > stop it? > > Naveen Somasundaram wrote: > We use the same consumer for both the checkpoint and changelog. The > checkpoint consumer needs to live for the entire lifespan of the Job (Hence > the coordinator consumer) will live for the entire lifespan of the job (Same > logic as commented why we keep bootstrapping each time). > > Yan Fang wrote: > All right, got a little confused now. > * Q1: Do we use the same coordinatorStream for both checkpoint and > changelog? > * Q2: Does this consumer happen in Container or AM? > * Q3: When do we need the consumer? Only when starting the container or > some other time as well?
Your understanding is correct, it doesn't store the changelog itself, but only the mapping. The change log is still a separate stream. Q1: That's correct, coordinatorstream is a single stream for all samza "meta" information. (Checkpoint/ChangelogMapping/Configs) all of them are present in the same stream and the coordinatorMessage type is used to distinguish them. Q2: The consumption happens in the AM, and the AM generates the JobModel, which is served to the containers when they start. However, the container writes(never reads) to the coordinatorstream (only the checkpoint messages) Q3: We need a consumer to read all checkpoints/changelogmapping and configs. Since we need to keep reading checkpoints all the time in the AM (To server the latest to the container), pretty much the entire lifecycle of the job. I think a lot of it becomes clearer when I finish writing the documentation with diagrams and stuff. - Naveen ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32147/#review81995 ----------------------------------------------------------- On April 10, 2015, 3:13 a.m., Naveen Somasundaram wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/32147/ > ----------------------------------------------------------- > > (Updated April 10, 2015, 3:13 a.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > SAMZA 465 > > > Diffs > ----- > > build.gradle 97de3a28f6379e3862eec845da87587b1d4f742e > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > 092cb910b40d312217e86420bf1ddfbaf605e9e5 > > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java > a97ff0919d8205928efee1a2a20780659180849d > samza-api/src/main/java/org/apache/samza/container/TaskName.java > 083358686fc69ab45bbc73e898f419224ebc3a9f > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > 8995ba30c823bddcdfd3af7100e1440e71ef7998 > samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java > 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 01997ae22641b735cd452a0e89a49219e2874892 > samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java > eb22d2ec5f09ca59790e2871d9bff9745fe925dc > > samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java > 7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde > > samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java > 3517912eaafbf95f8c8cc70ab5869548a56b76e7 > > samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > a40c87fa7865746a5612c55a4cf24c8d005be7e0 > > samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala > 2a87a6e0cef72179b5383fc824266de1f9606293 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 3b6685e00837a4aaf809813e62b7e52823bc07a9 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 > samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala > adef09e15c666cb2dbb2e4c5507fc2d605b82a1e > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala > 1ca9e2cc5673c537b6a48224809847e94da81fca > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 5416dd620b2f65ffb09cf5f8c07b1a547df82bab > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > c14f2f623bb4bae911dd3085ce428175930e4545 > > samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala > 10986a49b39cda703a0e54688dc914f2465c79c9 > > samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala > 635c3531a897a369c813821f7b901186e1281ed1 > > samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala > 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc > > samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala > b80d34953a54ada461ed1d4b0dcfa07f4435f877 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 530255e5866bc49ec5ce1a0b7437470cd4e17010 > samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala > 744eec05857a4ea14c718e3750fb575d3678b1f8 > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > ec1d749cb5186f788b402877996a4caa37e99362 > samza-core/src/main/scala/org/apache/samza/util/Util.scala > 1a67586eeec95dabfeb3b6881af9b3865c3029ca > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java > 76bc6810a3162a1dc58a36033b3b1f75616bd6ca > samza-core/src/test/resources/test.properties > 9348c7de956ebf02f58a163dc6fb391a7e29ae64 > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > af800dfeedbfea75abaac3f15fd53bc55b743daf > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > d18d4c45c5de3b50a24d6c776364e1f589db8f4d > > samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala > 10ff1f437220b38810f8a32903cc72df20f206d3 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > cab31ca6452e45c73808f20b12d39a30c117119a > > samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala > 1ee5c06fbd1be5e4ce944a16454c8bd32459d395 > > samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala > a8e5d36921464a2e36693279e8083e4544c4e289 > > samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala > 9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 > samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala > 60460713a2d4f7b7b389f21c1450d45c1afaa0f4 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala > 5d8ee4fa74be9ba36956c11ae33573be2d2d5826 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > c9504ecf1d1edbc116ca6d794678062fddeee7fa > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > 3dfa26a23c88f9b5b8ffbfd64b59c4061a8ed2e7 > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 02a6ea94daf9eb597c9ecef5d63062df5861efc8 > > samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala > a34c3f2738855dbf3737639c33846fcad23bd3b9 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > f783c5790f442928ea83b13359ac6b2a5bcb02e5 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > c84ceb75b84eb6d2ce115396ba54cf9e455d905c > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala > b76d5ad68640908bef552125d405b467386025f8 > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 7d4bea8398794c2325f9c022074303a83cfb164a > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 0380d35121f4feb99efc7d092b4232030d12db01 > > samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java > d3f25c0e03a727e64a774581384ef5aae9ef9c1c > samza-test/src/main/config/join/common.properties > ad10aac090beb072ecce3546f06279a7a6113970 > samza-test/src/main/config/negate-number.properties > b9f898c745250252461c833adb05e24ece2d4a89 > samza-test/src/main/config/perf/container-performance.properties > 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053 > samza-test/src/main/config/perf/kafka-read-write-performance.properties > 122b14aaecc2d221ac8944d04d508e7f83ede5ab > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > c0a20af5a2f4329ad4a2cff378ced3bececbc1cb > samza-test/src/main/python/samza_failure_testing.py > 198db26528cab8b473f794a922848a60299dc825 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala > 45c76e86361b9e1a54cb5fc717126d36b64cf7e8 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > d66b3bd070a4cef4b1d3dded1d79a33cbe3fa09b > samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala > 03395e2efa0fec723e354177d06bfacf7d2a9215 > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala > a1dbe0435ae08c710d4bfc871458ed386e275cd2 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > b0b6543856cb87888c5a719182ad9576b51bba1a > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala > 24b11da06a69da734c85720ef39d65ee46d821d5 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala > 765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala > 81dea9d6d1921462b200c62dbdf016c0eb2f01b2 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > 58f2464211a1fb7ff40f5978fd41f64d088002d0 > > Diff: https://reviews.apache.org/r/32147/diff/ > > > Testing > ------- > > The patch does the following: > 1. Removes all checkpointmanagers specific to the system and replaces them to > a single entity that uses coordinator stream to do the checkpointing > 2. Decoupled change log from checkpoint manager, a new component called > changelog manager will do that. > 3. Passes the checkpoint information to the containers from the jobcoordinator > 4. Modifies the offsetmanger to use the new checkpoint manager and starting > offsets from jobcoordinator. > > > Tests: > All existing Unit tests and Zopkio tests pass, I have changed some of > existing Unit tests as well. > Plan to add one more Unit test to verify checkpoint persistence (Stub present > in TestJobCoordinator) > > Pending Issues: > 1. The metrics registry is right now not used correctly, I need to pass the > right reference to it > 2. The coordinator stream does not use the same consumer and producer from > the "systemconsumers" and "systemproducers" we have (This will be done after > SAMZA-567) > 3. Remove checkpoint configs related to the stream, migrate them to the > coordinator stream and document them. > > > Thanks, > > Naveen Somasundaram > >