I have need to do some initial processing of the entries in my KV store on 
startup before processing messages.  I put the code into my task's init() 
method, and although it worked with an empty KV store/changelog once I have 
entries in there it bombs with a rather obscure exception:

java.util.NoSuchElementException: key not found: TaskName-Partition 3
at scala.collection.MapLike$class.default(MapLike.scala:228) 
~[scala-library-2.10.1.jar:na]
at scala.collection.AbstractMap.default(Map.scala:58) 
~[scala-library-2.10.1.jar:na]
at scala.collection.mutable.HashMap.apply(HashMap.scala:64) 
~[scala-library-2.10.1.jar:na]
at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:71) 
~[samza-core_2.10-0.8.0.jar:na]
at 
org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61)
 ~[samza-core_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) 
~[samza-kv_2.10-0.8.0.jar:na]
at 
org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57)
 ~[samza-kv_2.10-0.8.0.jar:na]
at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:159) 
~[samza-kv_2.10-0.8.0.jar:na]
at 
org.apache.samza.storage.kv.CachedStore$$anon$1.removeEldestEntry(CachedStore.scala:69)
 ~[samza-kv_2.10-0.8.0.jar:na]
at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) 
~[na:1.8.0_25]
at java.util.HashMap.putVal(HashMap.java:663) ~[na:1.8.0_25]
at java.util.HashMap.put(HashMap.java:611) ~[na:1.8.0_25]
at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:91) 
~[samza-kv_2.10-0.8.0.jar:na]
at 
org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
 ~[samza-kv_2.10-0.8.0.jar:na]
at 
org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
 ~[samza-kv_2.10-0.8.0.jar:na]
...

After some investigation I see that it's actually not safe to interact with 
anything that is going to potentially produce messages from init(), since 
startTask is called before startProducers in SamzaContainer.run.  In retrospect 
I guess that is why a MessageCollector is not passed to init() but of course 
writes to the KV store result in messages being sent to the changelog :/  I 
guess my question is whether or not this is intended behavior (could we not 
simply initialize producers before tasks) and if so, what an alternative might 
be for my use case.  As it is currently it seems like all I can do is add an 
"initProcessingDone" flag to my task and check it every time a message comes in.

________________________________

This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.

Reply via email to