Hi, Yi, I couldn't find any errors in the log indicating any issue writing to those particular changelog partitions. I event went ahead and removed all checkpoint, coordinator and changelog topics and started fresh. This issue is still manifested itself with offsets of 0:
partition offset ---------------------- 0 0 1 54251 2 54315 3 54196 4 53548 5 53581 6 55175 7 54599 8 53694 9 0 10 53456 11 53450 12 0 13 54442 14 54759 15 54958 16 54909 17 53396 18 55442 19 54121 In this case, we have partition 0, 2, 8 and 14 all running in the same YARN container, which means that it's not a container specific issue (since partition 2, 8 and 14 all get proper changelogs written). As I mentioned earlier, the changelog topic was auto-created by the samza job. So no manual overrides such as "auto-commit" was given. Thanks, David On Mon, Jun 13, 2016 at 9:40 AM, Yi Pan <[email protected]> wrote: > Hi, David, > > Did you check the log to see whether there is any log lines indicating the > producer issues on the three partitions that you suspect? And could you > also check whether you have auto-commit turned on? If your auto-commit is > on and producer does not report any issue writing to the changelog topic, > you may want to do a comparison between the local RocksDB and the one > persisted in changelog to verify that there indeed are some discrepancies > between them. Samza provides a command line tool state-storage-tool.sh to > recover the RocksDB state store from the changelog. You can use it to > recover the state store from changelog and compare w/ the local RocksDB to > verify if there is any discrepancies. > > Best. > > -Yi > > On Sun, Jun 12, 2016 at 12:49 PM, David Yu <[email protected]> > wrote: > > > Jagadish, > > > > All your description matches my understand. > > > > Here are our settings: > > - Our task aggregates user events into user sessions. > > - We have one k-v store for each task, which tracks active user sessions > > (with sessionId as the key). > > - When a user session expires, the session will be removed from the > store. > > - The changelog topic was auto created with cleanup.policy=*compact*. > > > > In terms of log compaction, I'm expecting it to keep the last log entry > for > > a given key and deletes all previous entries. For example, if we have: > > > > store.put("session1", Session1_1) // session created > > store.put("session1", Session1_2) // session updated > > store.delete("session1") // session expired > > > > > > I'm expecting something as following in the changelog (after compaction): > > > > 1 session1=Session1_1 > > 2 session1=Session1_2 > > 3 session1=NULL > > > > > > with only offset 3 retained. The next log entry should take offset 4. In > > that sense, the offsets should always increase monotonically, with lots > of > > gaps in between due to compaction. > > > > So again, I'm not sure why we have three changelog partitions that stop > > seeing movements in their offsets. > > > > Thanks, > > David > > > > On Sun, Jun 12, 2016 at 11:09 AM, Jagadish Venkatraman < > > [email protected]> wrote: > > > > > Some context: Each k-v store has a changelog topic. The # of partitions > > in > > > that changelog topic is equal to the # of tasks. Each task's K-V store > > will > > > be mapped to a particular partition of that changelog topic. This > mapping > > > from taskNames-changeLogPartitionNumber is stored in coordinator > stream. > > > > > > > Of course, you don't want this k-v changelog topic to keep growing. So, > > > people configure it with some expiration. The expiration can either be: > > > 1. Time retention: Records older than the retention are purged. > > > 2. Compaction: Newer key-values will over-write older keys and only the > > > most recent value is retained. > > > > > > I'm not sure if offsets are always monotonically increasing in Kafka or > > > could change after a compaction/ a time based retention kicks in for > the > > > topic partition. > > > > > > > > > > > > > > > > > > On Sat, Jun 11, 2016 at 11:53 PM, David Yu <[email protected]> > > > wrote: > > > > > > > My understanding of store changelog is that, each task writes store > > > changes > > > > to a particular changelog partition for that task. (Does that mean > the > > > > changelog keys are task names?) > > > > > > > > One thing that confuses me is that, the last offsets of some > changelog > > > > partitions do not move. I'm using the kafka GetOffsetShell tool to > get > > > the > > > > last offsets for each partition. The result looks like this: > > > > > > > > partition offset > > > > 0 7090 > > > > 1 3737937 > > > > 2 3733222 > > > > 3 3719065 > > > > 4 3730208 > > > > 5 3731128 > > > > 6 3734669 > > > > 7 3691461 > > > > 8 3759133 > > > > 9 7286 > > > > 10 3690347 > > > > 11 3722450 > > > > 12 7376 > > > > 13 3738454 > > > > 14 3742316 > > > > 15 3710512 > > > > 16 3777267 > > > > 17 3750596 > > > > 18 3728185 > > > > 19 3694470 > > > > > > > > As you can see, three of the partitions barely got any updates. In > > fact, > > > > the offsets stopped moving for a while. The traffic for each task > > should > > > be > > > > fairly balanced. I checked the task log and made sure that the stores > > for > > > > these partitions are actively updated. > > > > > > > > Any idea why this is happening? Or am I missing something? > > > > > > > > Thanks, > > > > David > > > > > > > > > > > > > > > > -- > > > Jagadish V, > > > Graduate Student, > > > Department of Computer Science, > > > Stanford University > > > > > >
