> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java,
> > line 149
> > <https://reviews.apache.org/r/32147/diff/7/?file=922534#file922534line149>
> >
> > Will this bootstrappedStreamSet overflow or take a lot of memory ?
> > Because we are putting all the messages in the stream to this map. When we
> > consider the checkpoint msg, it can be huge.
>
> Naveen Somasundaram wrote:
> It is true that we'll have multiple checkpoint messages, the reason why
> it won't be big:
> 1. The Coordinator stream will be log compacted.
> 2. The HashSet reads from the oldest to the latest, and if there are
> duplicates, it will overwritten in the HashSet. i.e., the Set should only
> have the latest checkpoint (one per Task), all the previous checkpoints will
> be overwritten as you are reading newer ones.
>
> Yan Fang wrote:
> "2. The HashSet reads from the oldest to the latest, and if there are
> duplicates, it will overwritten in the HashSet. i.e., the Set should only
> have the latest checkpoint (one per Task), all the previous checkpoints will
> be overwritten as you are reading newer ones."
>
> Why is this? From the code,
> ```
> CoordinatorStreamMessage coordinatorStreamMessage = new
> CoordinatorStreamMessage(keyArray, valueMap);
> bootstrappedStreamSet.add(coordinatorStreamMessage);
> ```
> It keeps adding new coordinatorStreamMessage
Yea, it's a hashset though, and when the SystemConsumer will read the message
(from the oldest offset - if you look at the register method), it will give a
message and the new message with the same key will overwrite the old one in the
HashSet. But, I just looked at the equals method of the HashSet
(CoordinatorMessage's equals method):
```
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CoordinatorStreamMessage other = (CoordinatorStreamMessage) obj;
if (isDelete != other.isDelete)
return false;
if (!Arrays.equals(keyArray, other.keyArray))
return false;
if (messageMap == null) {
if (other.messageMap != null)
return false;
} else if (!messageMap.equals(other.messageMap))
return false;
return true;
}
```
It also uses the MessageMap for equals operations, so the checkpoint will not
be compacted in the code (barring the fact that it will be log compacted in
Kafka). I will have to override the equals method for CheckpointMessage and
ChangelogMessage. I'll make this change with the doc. Good catch Yan!
> On April 29, 2015, 7:30 p.m., Yan Fang wrote:
> > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala, lines 83-92
> > <https://reviews.apache.org/r/32147/diff/7/?file=922552#file922552line83>
> >
> > Will the old config be overwritten when we produce the new config?
>
> Naveen Somasundaram wrote:
> Correct, this is code that deletes the delta (difference between the old
> and new config):
> ```
> (oldConfig.keySet -- config.keySet).foreach(key => {
> coordinatorSystemProducer.send(new
> CoordinatorStreamMessage.Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
> })
> ```
>
> Yan Fang wrote:
> Here I mean, will the old config be overriden by the new config? For
> example we have config foo=value1 in the stream. The new producer reads
> config file, which has foo=value2, and then "writeConfig" to the coordinator
> stream. Will the foo=value1 be replaced by foo=value2, then val oldConfig =
> coordinatorSystemConsumer.getConfig(); will get "foo=value1" instead?
oldConfig will get both the keys, but since use a map
(CoordinatorStreamSystemConsumer:151-156) we will have the latest value.
The order of events in kafka.
#User add the config (foo=value1, bar=value4) and started the JobRunner
#JobRunner writes using the producer to Kafka
#kafka has:
foo=value1
bar=value4
#User now decides to add new Configs foo=value2 and removes bar=value4
#Starts the JobRunner new producer writes new config (line 79)
#kafka has:
foo=value1
bar=value4
foo=value2
#now we read everything from Kafka (line 85)
oldConfig = foo->value2, bar=value4
newConfig = foo->value2
#keys to be deleted
(oldConfig -- newConfig)= bar=value4
#kafka has:
foo=value1
bar=value4
foo=value2
bar=null (marked for compaction)
The config seem by the AM:
foo=value2
I think oldConfig is poor name for the variable, it should really have been
called consolidatedConfig. That would it make it much more readable, I'll make
this change.
- Naveen
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32147/#review81995
-----------------------------------------------------------
On April 10, 2015, 3:13 a.m., Naveen Somasundaram wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32147/
> -----------------------------------------------------------
>
> (Updated April 10, 2015, 3:13 a.m.)
>
>
> Review request for samza.
>
>
> Repository: samza
>
>
> Description
> -------
>
> SAMZA 465
>
>
> Diffs
> -----
>
> build.gradle 97de3a28f6379e3862eec845da87587b1d4f742e
> samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
> 092cb910b40d312217e86420bf1ddfbaf605e9e5
>
> samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
> a97ff0919d8205928efee1a2a20780659180849d
> samza-api/src/main/java/org/apache/samza/container/TaskName.java
> 083358686fc69ab45bbc73e898f419224ebc3a9f
> samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
> 8995ba30c823bddcdfd3af7100e1440e71ef7998
> samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
> 6ff1a555f3c48e416bb78e94c5df71eff0a27f3d
>
> samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
> 01997ae22641b735cd452a0e89a49219e2874892
> samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
> PRE-CREATION
>
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
> PRE-CREATION
>
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
> PRE-CREATION
>
> samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
> PRE-CREATION
> samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
> eb22d2ec5f09ca59790e2871d9bff9745fe925dc
>
> samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
> 7dc431c74a3fc2ba80eb47d6c5d87524cb4c9bde
>
> samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
> 3517912eaafbf95f8c8cc70ab5869548a56b76e7
>
> samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
> PRE-CREATION
> samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
> ddc30af7c52d8a4d5c5de02f6757c040b1f31c93
> samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
> a40c87fa7865746a5612c55a4cf24c8d005be7e0
>
> samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
> 2a87a6e0cef72179b5383fc824266de1f9606293
> samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
> 3b6685e00837a4aaf809813e62b7e52823bc07a9
> samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
> 1a2dd4413f56e53dbeeb47b5637d7b0c50522f02
> samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
> adef09e15c666cb2dbb2e4c5507fc2d605b82a1e
> samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
> 1ca9e2cc5673c537b6a48224809847e94da81fca
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
> 5416dd620b2f65ffb09cf5f8c07b1a547df82bab
> samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
> c14f2f623bb4bae911dd3085ce428175930e4545
>
> samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
> 10986a49b39cda703a0e54688dc914f2465c79c9
>
> samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
> 635c3531a897a369c813821f7b901186e1281ed1
>
> samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
> PRE-CREATION
> samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
> 0b720ec4dd83c71fd1ce5071571c7a10babf0ddc
>
> samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
> b80d34953a54ada461ed1d4b0dcfa07f4435f877
> samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
> 530255e5866bc49ec5ce1a0b7437470cd4e17010
> samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
> 744eec05857a4ea14c718e3750fb575d3678b1f8
>
> samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
> ec1d749cb5186f788b402877996a4caa37e99362
> samza-core/src/main/scala/org/apache/samza/util/Util.scala
> 1a67586eeec95dabfeb3b6881af9b3865c3029ca
>
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
> PRE-CREATION
>
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
> PRE-CREATION
>
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
> PRE-CREATION
>
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
> PRE-CREATION
>
> samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
> PRE-CREATION
>
> samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
> 76bc6810a3162a1dc58a36033b3b1f75616bd6ca
> samza-core/src/test/resources/test.properties
> 9348c7de956ebf02f58a163dc6fb391a7e29ae64
>
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
> af800dfeedbfea75abaac3f15fd53bc55b743daf
>
> samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
> d18d4c45c5de3b50a24d6c776364e1f589db8f4d
>
> samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
> 10ff1f437220b38810f8a32903cc72df20f206d3
>
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
> cab31ca6452e45c73808f20b12d39a30c117119a
>
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
> 1ee5c06fbd1be5e4ce944a16454c8bd32459d395
>
> samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
> a8e5d36921464a2e36693279e8083e4544c4e289
>
> samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
> 9a8e1450a6cc14713817f719cfa56a0e5c97a6f6
> samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
> 60460713a2d4f7b7b389f21c1450d45c1afaa0f4
>
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
> 5d8ee4fa74be9ba36956c11ae33573be2d2d5826
>
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
> c9504ecf1d1edbc116ca6d794678062fddeee7fa
>
> samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
> 3dfa26a23c88f9b5b8ffbfd64b59c4061a8ed2e7
> samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
> 02a6ea94daf9eb597c9ecef5d63062df5861efc8
>
> samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
> a34c3f2738855dbf3737639c33846fcad23bd3b9
>
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
> f783c5790f442928ea83b13359ac6b2a5bcb02e5
>
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
> c84ceb75b84eb6d2ce115396ba54cf9e455d905c
>
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
> b76d5ad68640908bef552125d405b467386025f8
>
> samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
> 7d4bea8398794c2325f9c022074303a83cfb164a
>
> samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
> 0380d35121f4feb99efc7d092b4232030d12db01
>
> samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
> d3f25c0e03a727e64a774581384ef5aae9ef9c1c
> samza-test/src/main/config/join/common.properties
> ad10aac090beb072ecce3546f06279a7a6113970
> samza-test/src/main/config/negate-number.properties
> b9f898c745250252461c833adb05e24ece2d4a89
> samza-test/src/main/config/perf/container-performance.properties
> 86b2d58e2d4a9d6bcc9bd305bb7d735f7d1dd053
> samza-test/src/main/config/perf/kafka-read-write-performance.properties
> 122b14aaecc2d221ac8944d04d508e7f83ede5ab
> samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
> c0a20af5a2f4329ad4a2cff378ced3bececbc1cb
> samza-test/src/main/python/samza_failure_testing.py
> 198db26528cab8b473f794a922848a60299dc825
>
> samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
> 45c76e86361b9e1a54cb5fc717126d36b64cf7e8
>
> samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
> d66b3bd070a4cef4b1d3dded1d79a33cbe3fa09b
> samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
> 03395e2efa0fec723e354177d06bfacf7d2a9215
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
> a1dbe0435ae08c710d4bfc871458ed386e275cd2
>
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
> 8ba435ef2ccf2af64d01eb4bc3b1c362fb03779d
>
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
> b0b6543856cb87888c5a719182ad9576b51bba1a
> samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
> 24b11da06a69da734c85720ef39d65ee46d821d5
>
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
> 765f72f4c10bd0f1d1adab28c8ec54d9cbea5fb4
>
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
> 81dea9d6d1921462b200c62dbdf016c0eb2f01b2
>
> samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
> 58f2464211a1fb7ff40f5978fd41f64d088002d0
>
> Diff: https://reviews.apache.org/r/32147/diff/
>
>
> Testing
> -------
>
> The patch does the following:
> 1. Removes all checkpointmanagers specific to the system and replaces them to
> a single entity that uses coordinator stream to do the checkpointing
> 2. Decoupled change log from checkpoint manager, a new component called
> changelog manager will do that.
> 3. Passes the checkpoint information to the containers from the jobcoordinator
> 4. Modifies the offsetmanger to use the new checkpoint manager and starting
> offsets from jobcoordinator.
>
>
> Tests:
> All existing Unit tests and Zopkio tests pass, I have changed some of
> existing Unit tests as well.
> Plan to add one more Unit test to verify checkpoint persistence (Stub present
> in TestJobCoordinator)
>
> Pending Issues:
> 1. The metrics registry is right now not used correctly, I need to pass the
> right reference to it
> 2. The coordinator stream does not use the same consumer and producer from
> the "systemconsumers" and "systemproducers" we have (This will be done after
> SAMZA-567)
> 3. Remove checkpoint configs related to the stream, migrate them to the
> coordinator stream and document them.
>
>
> Thanks,
>
> Naveen Somasundaram
>
>