> On Oct. 5, 2016, 2:08 a.m., Jake Maes wrote: > > Looks better, but I think there's still one major part missing. > > > > In order to have agreement between a kafka changelog and the task storage, > > the changelog should be created with the same delete.retention.ms property. > > > > There are 2 ways to do this: > > 1. (preferred) update the kafka system admin to read the samza changelog > > property that you've defined (which also needs to be added to the config > > table, btw) and create the topic with that value for delete.retention.ms > > 2. Rename the property so it's one of the "topic-level-property" so it gets > > automatically passed to kafka. This is convenient but wouldn't apply to > > other systems, which could be useful if those other systems have a delete > > retention policy. > > Shanthoosh Venkataraman wrote: > I think 1) is the only plausible way to accomplish this through job > config. delete.retention.ms configuration is associated only with stores > changelog, not applicable to topics in general, so making it topic level > property might notbe a good idea. Enforcing the delete.retention.ms property > is harder to accomplish through config, since kafka is a external system. > Ideally, if there's a way to fetch kafka metadata/config(delete.retention.ms) > about a topic, during container startups we could fetch that value, rather > than expecting the users to specify it. > > Jake Maes wrote: > Please take a look at > org.apache.samza.config.KafkaConfig#getChangelogKafkaProperties
Done. - Shanthoosh ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/52476/#review151439 ----------------------------------------------------------- On Oct. 17, 2016, 10:40 p.m., Shanthoosh Venkataraman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/52476/ > ----------------------------------------------------------- > > (Updated Oct. 17, 2016, 10:40 p.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > Every local task store is backed up by a kafka changelog topic. Due to log > compaction, delete tombstones of the changelog topic have a ttl of > delete.retention.ms. Replaying the events from the changelog that has missing > delete tombstones, would result in creation of an inconsistent local > store(due to the missing of some delete events). This patch deletes the local > stores in which difference between current time and last modified time of the > offset file is greater than delete.retention.ms during the container startup. > > > Diffs > ----- > > samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java > 9329edf7d724f3a0d9235354bb77936f713e3b5f > samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala > be3f1068f7921c9c8f8697577efea6580672813e > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 05a996c98075ea8ed3767af666b9beeb1933f2a6 > samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala > 0b7bcdda1639eea8239a69c31bdf42558e9077d2 > > samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala > 4d40f520e54beb643acd8410c772b75e2f6a9162 > samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala > 973ab8cfb3d248bec7efe5e338f5e667f097556d > > Diff: https://reviews.apache.org/r/52476/diff/ > > > Testing > ------- > > Unit testing and manual testing has been done to verify the functionality. > > > Thanks, > > Shanthoosh Venkataraman > >