Hello,
I was setting up the key-value storage engine in Samza and ran into an
exception when querying the data.
I added these properties to the config:
stores.engaged-store.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.engaged-store.changelog=kafka.engaged-store-changelog
# a custom data type with an appropriate Serde
stores.engaged-store.key.serde=UserAppPair
# wrote a Serde for Long using ByteBuffer
stores.engaged-store.msg.serde=Long
I have no trouble initializing the storage engine with:
val store =
context.getStore("engaged-store").asInstanceOf[KeyValueStore[UserAppPair,
Long]];
but when I query by the key when processing messages, it’s throwing an
exception:
val key = new UserAppPair(userId, appId);
val value = store.get(key);
Here’s the log:
2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
localhost:9092
2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received an
invalid or empty offset None for [Follows,0]. Attempting to use Kafka's
auto.offset.reset setting. This can result in data loss if processing continues.
2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset is
defined for topic Follows
2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
localhost:9092
2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key in
rocksdb.
2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in process loop.
org.rocksdb.RocksDBException: IO error: directory: Invalid argument
at org.rocksdb.RocksDB.open(Native Method)
at org.rocksdb.RocksDB.open(RocksDB.java:133)
at
org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(RocksDbKeyValueStore.scala:85)
at
org.apache.samza.storage.kv.RocksDbKeyValueStore.db(RocksDbKeyValueStore.scala:85)
at
org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:92)
at
org.apache.samza.storage.kv.RocksDbKeyValueStore.get(RocksDbKeyValueStore.scala:80)
at org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
at
org.apache.samza.storage.kv.SerializedKeyValueStore.get(SerializedKeyValueStore.scala:36)
at org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
at
org.apache.samza.storage.kv.NullSafeKeyValueStore.get(NullSafeKeyValueStore.scala:36)
at
org.apache.samza.storage.kv.KeyValueStorageEngine.get(KeyValueStorageEngine.scala:44)
at
me.doubledutch.analytics.task.EngagedUsersTask.engaged(EngagedUsersTask.scala:66)
at
me.doubledutch.analytics.task.EngagedUsersTask.process(EngagedUsersTask.scala:100)
at
org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:137)
at
org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
at
org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136)
at
org.apache.samza.container.RunLoop$$anonfun$process$2.apply(RunLoop.scala:93)
at
org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
at org.apache.samza.container.RunLoop.updateTimer(RunLoop.scala:36)
at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
at
org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:108)
at
org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:87)
at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
multiplexer.
2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for
localhost:9092
2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due to
socket error: null
2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt exception in
broker proxy thread.
2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to interrupt.
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
multiplexer.
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
stream tasks.
2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task instance
stores.
Same exception is thrown if I try to put a value in RocksDB. Has anyone run
into this problem before or has any pointers into solving it?
Lukas