----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/52476/#review164701 -----------------------------------------------------------
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala (line 33) <https://reviews.apache.org/r/52476/#comment236461> nit: TimeUnit.DAYS.toMillis(1) samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala (lines 170 - 171) <https://reviews.apache.org/r/52476/#comment236464> The logic structure is weird. We really shouldn't need to read the file in order to return whether the file is present and its last modified time. It seems like we need a "isStoreValid()" method which 1. Checks persistedStores.contains(storeName) 2. Calls isStaleLoggedStore to verify the age, 3. Calls a new isOffsetFileValid method to verify the contents of the file And it returns true only if all the above checks succeed. - Jake Maes On Oct. 22, 2016, 10:06 p.m., Shanthoosh Venkataraman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/52476/ > ----------------------------------------------------------- > > (Updated Oct. 22, 2016, 10:06 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > Every local task store is backed up by a kafka changelog topic. Due to log > compaction, delete tombstones of the changelog topic have a ttl of > delete.retention.ms. Replaying the events from the changelog that has missing > delete tombstones, would result in creation of an inconsistent local > store(due to the missing of some delete events). This patch deletes the local > stores in which difference between current time and last modified time of the > offset file is greater than delete.retention.ms during the container startup. > > > Diffs > ----- > > samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java > 9329edf7d724f3a0d9235354bb77936f713e3b5f > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala > be3f1068f7921c9c8f8697577efea6580672813e > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 05a996c98075ea8ed3767af666b9beeb1933f2a6 > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > 0b7bcdda1639eea8239a69c31bdf42558e9077d2 > > samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala > 4d40f520e54beb643acd8410c772b75e2f6a9162 > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 973ab8cfb3d248bec7efe5e338f5e667f097556d > > Diff: https://reviews.apache.org/r/52476/diff/ > > > Testing > ------- > > Unit testing and manual testing has been done to verify the functionality. > > > Thanks, > > Shanthoosh Venkataraman > >