> On July 8, 2015, 7:34 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala, > > line 46 > > <https://reviews.apache.org/r/35676/diff/3/?file=1001916#file1001916line46> > > > > This reminds me of one thing: what if the job has both Kafka and > > Databus streams as inputs? The matching cases for Kafka will go ahead to > > migrate all Kafka offsets but the JobRunner will proceed w/o migrating the > > non-Kafka offsets. Won't it be better to abort here instead of proceeding > > w/ the migration? Hence, for jobs w/ mixed input streams, we will force the > > following safe sequence: > > 1. run the checkpoint tool to migrate the offsets > > 2. remove all checkpoint.factory configuration > > 3. restart the job
Correction to my previous point: there should be no mixed checkpoint factories, hence the point on the mixed input systems is invalid. But I would think that for non-Kafka checkpoint system, it would still be safer to abort the migration, and force the safe sequence I described above. - Yi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35676/#review90962 ----------------------------------------------------------- On July 8, 2015, 1:41 a.m., Naveen Somasundaram wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/35676/ > ----------------------------------------------------------- > > (Updated July 8, 2015, 1:41 a.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > SAMZA-615: Checkpoint migration > > > Diffs > ----- > > build.gradle a5f54106a822dc91ff82270df27217a8765a0d80 > checkstyle/import-control.xml 3374f0c432e61ac4cda275377604cfd481f0cddf > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java > 6c1e488d00d8593d59c89b57e673e0b6b90fd7d2 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java > 92f8907f3cbd490ccafb2d963091a644604eb49b > samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala > 1c178a661e449c6bdfc4ce431aef9bb2d261a6c2 > > samza-core/src/main/scala/org/apache/samza/migration/JobRunnerMigration.scala > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/migration/MigrationPlan.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > 647cadb3a4e51bec8204197d77ad35a6b29afcec > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > e454593ebd2b09f0cfb7a7531f05394348253e9b > > samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java > ac26a015596688a1a9fe5a078ce506de64364938 > samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala > 52057edbbcd08b7d6edde7e898466d26534f52f6 > samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointLogKey.scala > PRE-CREATION > samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManager.scala > PRE-CREATION > > samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointManagerFactory.scala > PRE-CREATION > samza-kafka/src/main/scala/old/checkpoint/KafkaCheckpointMigration.scala > PRE-CREATION > > samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala > 39c54aa4ee27736c0400d295904872ca6fa3fb88 > samza-kafka/src/test/scala/old/checkpoint/TestKafkaCheckpointManager.scala > PRE-CREATION > samza-test/src/main/config/join/common.properties > ac87e81aeea4015b2fc83942a62e3d16c4fbbd2b > > Diff: https://reviews.apache.org/r/35676/diff/ > > > Testing > ------- > > Unit test added, test will hello-samza in progress > > Hello-samza test: > > OUTPUT FROM 0.9: > ~/Documents/hello-samza(branch:latest) » > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 > --topic __samza_checkpoint_ver_1_for_wikipedia-parser_1 --from-beginning > > > nsomasun@nsomasun-mn1 > > {"Partition 0":0} > {} > {"SystemStreamPartition [kafka, wikipedia-raw, > 0]":{"system":"kafka","partition":"0","offset":"1674","stream":"wikipedia-raw"}} > {"SystemStreamPartition [kafka, wikipedia-raw, > 0]":{"system":"kafka","partition":"0","offset":"1808","stream":"wikipedia-raw"}} > {"SystemStreamPartition [kafka, wikipedia-raw, > 0]":{"system":"kafka","partition":"0","offset":"1950","stream":"wikipedia-raw"}} > {"SystemStreamPartition [kafka, wikipedia-raw, > 0]":{"system":"kafka","partition":"0","offset":"2103","stream":"wikipedia-raw"}} > > ------------------------------------------------------------ > > OUTPUT FROM 0.10.0: > > ~/Documents/hello-samza(branch:latest) » > deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 > --topic __samza_coordinator_wikipedia-parser_1 --from-beginning > > > nsomasun@nsomasun-mn1 > {"host":"172.21.136.130","source":"CHECKPOINTMIGRATION","values":{"kafka.wikipedia-raw.0":"2103"},"username":"nsomasun","timestamp":1435870948879} > > > Thanks, > > Naveen Somasundaram > >