> On April 29, 2015, 7:30 p.m., Yan Fang wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java, > > line 149 > > <https://reviews.apache.org/r/32147/diff/7/?file=922534#file922534line149> > > > > Will this bootstrappedStreamSet overflow or take a lot of memory ? > > Because we are putting all the messages in the stream to this map. When we > > consider the checkpoint msg, it can be huge. > > Naveen Somasundaram wrote: > It is true that we'll have multiple checkpoint messages, the reason why > it won't be big: > 1. The Coordinator stream will be log compacted. > 2. The HashSet reads from the oldest to the latest, and if there are > duplicates, it will overwritten in the HashSet. i.e., the Set should only > have the latest checkpoint (one per Task), all the previous checkpoints will > be overwritten as you are reading newer ones. > > Yan Fang wrote: > "2. The HashSet reads from the oldest to the latest, and if there are > duplicates, it will overwritten in the HashSet. i.e., the Set should only > have the latest checkpoint (one per Task), all the previous checkpoints will > be overwritten as you are reading newer ones." > > Why is this? From the code, > ``` > CoordinatorStreamMessage coordinatorStreamMessage = new > CoordinatorStreamMessage(keyArray, valueMap); > bootstrappedStreamSet.add(coordinatorStreamMessage); > ``` > It keeps adding new coordinatorStreamMessage > > Naveen Somasundaram wrote: > Yea, it's a hashset though, and when the SystemConsumer will read the > message (from the oldest offset - if you look at the register method), it > will give a message and the new message with the same key will overwrite the > old one in the HashSet. But, I just looked at the equals method of the > HashSet (CoordinatorMessage's equals method): > ``` > @Override > public boolean equals(Object obj) { > if (this == obj) > return true; > if (obj == null) > return false; > if (getClass() != obj.getClass()) > return false; > CoordinatorStreamMessage other = (CoordinatorStreamMessage) obj; > if (isDelete != other.isDelete) > return false; > if (!Arrays.equals(keyArray, other.keyArray)) > return false; > if (messageMap == null) { > if (other.messageMap != null) > return false; > } else if (!messageMap.equals(other.messageMap)) > return false; > return true; > } > > ``` > > It also uses the MessageMap for equals operations, so the checkpoint will > not be compacted in the code (barring the fact that it will be log compacted > in Kafka). I will have to override the equals method for CheckpointMessage > and ChangelogMessage. I'll make this change with the doc. Good catch Yan!
Sounds good. Thanks for the explaination. > On April 29, 2015, 7:30 p.m., Yan Fang wrote: > > samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java, > > lines 60-68 > > <https://reviews.apache.org/r/32147/diff/7/?file=922539#file922539line60> > > > > actually, we do not need the consumer after boostrapping, right? Do we > > stop it? > > Naveen Somasundaram wrote: > We use the same consumer for both the checkpoint and changelog. The > checkpoint consumer needs to live for the entire lifespan of the Job (Hence > the coordinator consumer) will live for the entire lifespan of the job (Same > logic as commented why we keep bootstrapping each time). > > Yan Fang wrote: > All right, got a little confused now. > * Q1: Do we use the same coordinatorStream for both checkpoint and > changelog? > * Q2: Does this consumer happen in Container or AM? > * Q3: When do we need the consumer? Only when starting the container or > some other time as well? > > Naveen Somasundaram wrote: > Your understanding is correct, it doesn't store the changelog itself, but > only the mapping. The change log is still a separate stream. > Q1: That's correct, coordinatorstream is a single stream for all samza > "meta" information. (Checkpoint/ChangelogMapping/Configs) all of them are > present in the same stream and the coordinatorMessage type is used to > distinguish them. > Q2: The consumption happens in the AM, and the AM generates the JobModel, > which is served to the containers when they start. However, the container > writes(never reads) to the coordinatorstream (only the checkpoint messages) > Q3: We need a consumer to read all checkpoints/changelogmapping and > configs. Since we need to keep reading checkpoints all the time in the AM (To > server the latest to the container), pretty much the entire lifecycle of the > job. > > I think a lot of it becomes clearer when I finish writing the > documentation with diagrams and stuff. "Q2: The consumption happens in the AM, and the AM generates the JobModel, which is served to the containers when they start. However, the container writes(never reads) to the coordinatorstream (only the checkpoint messages)" Yeah, this is what I thought. However, if you check the changes in SamzaContainer.class, we are starting the Consumer, which is not necessary. ``` val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry) --> val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, String.valueOf(containerId)) --> val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets) --> offsetManager.start --> initializeCheckpointManager --> coordinatorStreamProducer.start(); coordinatorStreamConsumer.start(); ``` > On April 29, 2015, 7:30 p.m., Yan Fang wrote: > > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala, lines 83-92 > > <https://reviews.apache.org/r/32147/diff/7/?file=922552#file922552line83> > > > > Will the old config be overwritten when we produce the new config? > > Naveen Somasundaram wrote: > Correct, this is code that deletes the delta (difference between the old > and new config): > ``` > (oldConfig.keySet -- config.keySet).foreach(key => { > coordinatorSystemProducer.send(new > CoordinatorStreamMessage.Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) > }) > ``` > > Yan Fang wrote: > Here I mean, will the old config be overriden by the new config? For > example we have config foo=value1 in the stream. The new producer reads > config file, which has foo=value2, and then "writeConfig" to the coordinator > stream. Will the foo=value1 be replaced by foo=value2, then val oldConfig = > coordinatorSystemConsumer.getConfig(); will get "foo=value1" instead? > > Naveen Somasundaram wrote: > oldConfig will get both the keys, but since use a map > (CoordinatorStreamSystemConsumer:151-156) we will have the latest value. > The order of events in kafka. > > #User add the config (foo=value1, bar=value4) and started the JobRunner > #JobRunner writes using the producer to Kafka > #kafka has: > foo=value1 > bar=value4 > #User now decides to add new Configs foo=value2 and removes bar=value4 > #Starts the JobRunner new producer writes new config (line 79) > #kafka has: > foo=value1 > bar=value4 > foo=value2 > #now we read everything from Kafka (line 85) > oldConfig = foo->value2, bar=value4 > newConfig = foo->value2 > #keys to be deleted > (oldConfig -- newConfig)= bar=value4 > #kafka has: > foo=value1 > bar=value4 > foo=value2 > bar=null (marked for compaction) > > The config seem by the AM: > foo=value2 > > > I think oldConfig is poor name for the variable, it should really have > been called consolidatedConfig. That would it make it much more readable, > I'll make this change. This makes sense. Agreed on this change. I think this is why I was misled. - Yan ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32147/#review81995 ----------------------------------------------------------- On April 10, 2015, 3:13 a.m., Naveen Somasundaram wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/32147/ > ----------------------------------------------------------- > > (Updated April 10, 2015, 3:13 a.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > SAMZA 465 > > > Diffs > ----- > > build.gradle 97de3a28f6379e3862eec845da87587b1d4f742e > samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > 092cb910b40d312217e86420bf1ddfbaf605e9e5 > > samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java > a97ff0919d8205928efee1a2a20780659180849d > samza-api/src/main/java/org/apache/samza/container/TaskName.java > 083358686fc69ab45bbc73e898f419224ebc3a9f > samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java > 8995ba30c823bddcdfd3af7100e1440e71ef7998 > samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java > 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d > > samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java > 01997ae22641b735cd452a0e89a49219e2874892 > samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java > eb22d2ec5f09ca59790e2871d9bff9745fe925dc > > samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java > 7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde > > samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java > 3517912eaafbf95f8c8cc70ab5869548a56b76e7 > > samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala > ddc30af7c52d8a4d5c5de02f6757c040b1f31c93 > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > a40c87fa7865746a5612c55a4cf24c8d005be7e0 > > samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala > 2a87a6e0cef72179b5383fc824266de1f9606293 > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 3b6685e00837a4aaf809813e62b7e52823bc07a9 > samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala > 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02 > samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala > adef09e15c666cb2dbb2e4c5507fc2d605b82a1e > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala > 1ca9e2cc5673c537b6a48224809847e94da81fca > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 5416dd620b2f65ffb09cf5f8c07b1a547df82bab > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > c14f2f623bb4bae911dd3085ce428175930e4545 > > samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala > 10986a49b39cda703a0e54688dc914f2465c79c9 > > samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala > 635c3531a897a369c813821f7b901186e1281ed1 > > samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala > 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc > > samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala > b80d34953a54ada461ed1d4b0dcfa07f4435f877 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 530255e5866bc49ec5ce1a0b7437470cd4e17010 > samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala > 744eec05857a4ea14c718e3750fb575d3678b1f8 > > samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala > ec1d749cb5186f788b402877996a4caa37e99362 > samza-core/src/main/scala/org/apache/samza/util/Util.scala > 1a67586eeec95dabfeb3b6881af9b3865c3029ca > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java > 76bc6810a3162a1dc58a36033b3b1f75616bd6ca > samza-core/src/test/resources/test.properties > 9348c7de956ebf02f58a163dc6fb391a7e29ae64 > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala > af800dfeedbfea75abaac3f15fd53bc55b743daf > > samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala > d18d4c45c5de3b50a24d6c776364e1f589db8f4d > > samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala > 10ff1f437220b38810f8a32903cc72df20f206d3 > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > cab31ca6452e45c73808f20b12d39a30c117119a > > samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala > 1ee5c06fbd1be5e4ce944a16454c8bd32459d395 > > samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala > a8e5d36921464a2e36693279e8083e4544c4e289 > > samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala > 9a8e1450a6cc14713817f719cfa56a0e5c97a6f6 > samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala > 60460713a2d4f7b7b389f21c1450d45c1afaa0f4 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala > 5d8ee4fa74be9ba36956c11ae33573be2d2d5826 > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala > c9504ecf1d1edbc116ca6d794678062fddeee7fa > > samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala > 3dfa26a23c88f9b5b8ffbfd64b59c4061a8ed2e7 > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 02a6ea94daf9eb597c9ecef5d63062df5861efc8 > > samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala > a34c3f2738855dbf3737639c33846fcad23bd3b9 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala > f783c5790f442928ea83b13359ac6b2a5bcb02e5 > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala > c84ceb75b84eb6d2ce115396ba54cf9e455d905c > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala > b76d5ad68640908bef552125d405b467386025f8 > > samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala > 7d4bea8398794c2325f9c022074303a83cfb164a > > samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala > 0380d35121f4feb99efc7d092b4232030d12db01 > > samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java > d3f25c0e03a727e64a774581384ef5aae9ef9c1c > samza-test/src/main/config/join/common.properties > ad10aac090beb072ecce3546f06279a7a6113970 > samza-test/src/main/config/negate-number.properties > b9f898c745250252461c833adb05e24ece2d4a89 > samza-test/src/main/config/perf/container-performance.properties > 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053 > samza-test/src/main/config/perf/kafka-read-write-performance.properties > 122b14aaecc2d221ac8944d04d508e7f83ede5ab > samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java > c0a20af5a2f4329ad4a2cff378ced3bececbc1cb > samza-test/src/main/python/samza_failure_testing.py > 198db26528cab8b473f794a922848a60299dc825 > > samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala > 45c76e86361b9e1a54cb5fc717126d36b64cf7e8 > > samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala > d66b3bd070a4cef4b1d3dded1d79a33cbe3fa09b > samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala > 03395e2efa0fec723e354177d06bfacf7d2a9215 > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala > a1dbe0435ae08c710d4bfc871458ed386e275cd2 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala > 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala > b0b6543856cb87888c5a719182ad9576b51bba1a > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala > 24b11da06a69da734c85720ef39d65ee46d821d5 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala > 765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala > 81dea9d6d1921462b200c62dbdf016c0eb2f01b2 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala > 58f2464211a1fb7ff40f5978fd41f64d088002d0 > > Diff: https://reviews.apache.org/r/32147/diff/ > > > Testing > ------- > > The patch does the following: > 1. Removes all checkpointmanagers specific to the system and replaces them to > a single entity that uses coordinator stream to do the checkpointing > 2. Decoupled change log from checkpoint manager, a new component called > changelog manager will do that. > 3. Passes the checkpoint information to the containers from the jobcoordinator > 4. Modifies the offsetmanger to use the new checkpoint manager and starting > offsets from jobcoordinator. > > > Tests: > All existing Unit tests and Zopkio tests pass, I have changed some of > existing Unit tests as well. > Plan to add one more Unit test to verify checkpoint persistence (Stub present > in TestJobCoordinator) > > Pending Issues: > 1. The metrics registry is right now not used correctly, I need to pass the > right reference to it > 2. The coordinator stream does not use the same consumer and producer from > the "systemconsumers" and "systemproducers" we have (This will be done after > SAMZA-567) > 3. Remove checkpoint configs related to the stream, migrate them to the > coordinator stream and document them. > > > Thanks, > > Naveen Somasundaram > >