Hey Tommy, This sounds broken. Let me have a look and see if there's an easy fix. I *think* reordering should work, but I just want to make sure.
Could you open a JIRA and set the fixed version to 0.9.0? I'll take a looks today/tomorrow. If you want to test out reordering it, please share any findings. :) Cheers, Chris On Monday, February 16, 2015, Tommy Becker <tobec...@tivo.com> wrote: > I have need to do some initial processing of the entries in my KV store on > startup before processing messages. I put the code into my task's init() > method, and although it worked with an empty KV store/changelog once I have > entries in there it bombs with a rather obscure exception: > > java.util.NoSuchElementException: key not found: TaskName-Partition 3 > at scala.collection.MapLike$class.default(MapLike.scala:228) > ~[scala-library-2.10.1.jar:na] > at scala.collection.AbstractMap.default(Map.scala:58) > ~[scala-library-2.10.1.jar:na] > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > ~[scala-library-2.10.1.jar:na] > at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71) > ~[samza-core_2.10-0.8.0.jar:na] > at > org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) > ~[samza-core_2.10-0.8.0.jar:na] > at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) > ~[samza-kv_2.10-0.8.0.jar:na] > at > org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57) > ~[samza-kv_2.10-0.8.0.jar:na] > at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159) > ~[samza-kv_2.10-0.8.0.jar:na] > at > org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69) > ~[samza-kv_2.10-0.8.0.jar:na] > at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) > ~[na:1.8.0_25] > at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25] > at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25] > at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91) > ~[samza-kv_2.10-0.8.0.jar:na] > at > org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36) > ~[samza-kv_2.10-0.8.0.jar:na] > at > org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44) > ~[samza-kv_2.10-0.8.0.jar:na] > ... > > After some investigation I see that it's actually not safe to interact > with anything that is going to potentially produce messages from init(), > since startTask is called before startProducers in SamzaContainer.run. In > retrospect I guess that is why a MessageCollector is not passed to init() > but of course writes to the KV store result in messages being sent to the > changelog :/ I guess my question is whether or not this is intended > behavior (could we not simply initialize producers before tasks) and if so, > what an alternative might be for my use case. As it is currently it seems > like all I can do is add an "initProcessingDone" flag to my task and check > it every time a message comes in. > > ________________________________ > > This email and any attachments may contain confidential and privileged > material for the sole use of the intended recipient. Any review, copying, > or distribution of this email (or any attachments) by others is prohibited. > If you are not the intended recipient, please contact the sender > immediately and permanently delete this email and any attachments. No > employee or agent of TiVo Inc. is authorized to conclude any binding > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > Inc. may only be made by a signed written agreement. >