Hey Lukas, > This happens every time even if I spin up a new VM.
Ah I might have misunderstood. Are your VMs started with a fresh FS? You're not using EBS or anything like that, are you? I want to see if you're getting hit by that setErrorIfExists line. If you: 1. Stop your job. 2. Clear the state from the FS. 3. Start your job. Does it work? Cheers, Chris On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <criccom...@apache.org> wrote: > Hey Lukas, > > Could you try clearing out the state, and starting the job? > > Cheers, > Chris > > On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me> > wrote: > >> This happens every time even if I spin up a new VM. Happens after a >> restart as well. >> >> Lukas >> >> -----Original Message----- From: Chris Riccomini >> Sent: Tuesday, February 17, 2015 11:01 AM >> To: dev@samza.apache.org >> Subject: Re: RocksDBException: IO error: directory: Invalid argument >> >> Hey Lukas, >> >> Interesting. Does this happen only after restarting your job? Or does it >> happen the first time, as well? I'm wondering if this is the problem: >> >> options.setErrorIfExists(true) >> >> In RocksDbKeyValueStore.scala. I think this is set under the assumption >> that the job is run in YARN. If you run locally, it seems to me that the >> directory would continue to exist after a job is restarted. If you delete >> your state directory, and restart your job, does the problem temporarily >> go >> away until a subsequent restart happens? >> >> Cheers, >> Chris >> >> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me> >> wrote: >> >> Hi Chris, >>> >>> 1. We're running locally using ProcessJobFactory >>> 2. CentOS 7 x86_64 >>> 3. >>> startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db >>> engaged-users.log: https://gist.github.com/ >>> imbusy/0b3d264a40ddf34ab8e7 >>> engaged-users.properties: https://gist.github.com/ >>> imbusy/d0019db29d7b68c60bfc >>> >>> Also note that the properties file sets the default offset to oldest, >>> but the log file says that it's setting the offset to largest: >>> "2015-02-17 >>> 18:46:32 GetOffset [INFO] Got reset of type largest." >>> >>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got >>> storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state" >>> I checked the directory and it actually exists: >>> >>> du -h /vagrant/SamzaJobs/deploy/samza/state >>> >>> 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0 >>> 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1 >>> 0 /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2 >>> 16K /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3 >>> 36K /vagrant/SamzaJobs/deploy/samza/state/engaged-store >>> 36K /vagrant/SamzaJobs/deploy/samza/state >>> >>> Lukas >>> >>> -----Original Message----- From: Chris Riccomini >>> Sent: Monday, February 16, 2015 5:53 PM >>> To: dev@samza.apache.org >>> Subject: Re: RocksDBException: IO error: directory: Invalid argument >>> >>> >>> Hey Lukas, >>> >>> It looks like the exception is actually thrown on get, not put: >>> >>> at org.apache.samza.storage.kv.KeyValueStorageEngine.get( >>> KeyValueStorageEngine.scala:44) >>> >>> 1. Are you running your job under YARN, or as a local job >>> (ThreadJobFactory/ProcessJobFactory)? >>> 2. What OS are you running on? >>> 3. Could post a fully copy of your logs somewhere (github gist, >>> pasteboard, >>> or something)? >>> 4. Also, what does this line say in your logs: >>> >>> info("Got storage engine base directory: %s" format storeBaseDir) >>> >>> It sounds like something is getting messed up with the directory where >>> the >>> RocksDB store is trying to keep its data. >>> >>> Cheers, >>> Chris >>> >>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me> >>> wrote: >>> >>> Hello, >>> >>>> >>>> I was setting up the key-value storage engine in Samza and ran into an >>>> exception when querying the data. >>>> >>>> I added these properties to the config: >>>> >>>> >>>> stores.engaged-store.factory=org.apache.samza.storage.kv. >>>> RocksDbKeyValueStorageEngineFactory >>>> stores.engaged-store.changelog=kafka.engaged-store-changelog >>>> # a custom data type with an appropriate Serde >>>> stores.engaged-store.key.serde=UserAppPair >>>> # wrote a Serde for Long using ByteBuffer >>>> stores.engaged-store.msg.serde=Long >>>> >>>> I have no trouble initializing the storage engine with: >>>> >>>> val store = >>>> context.getStore("engaged-store").asInstanceOf[ >>>> KeyValueStore[UserAppPair, >>>> Long]]; >>>> >>>> but when I query by the key when processing messages, it’s throwing an >>>> exception: >>>> >>>> val key = new UserAppPair(userId, appId); >>>> val value = store.get(key); >>>> >>>> Here’s the log: >>>> >>>> 2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for >>>> localhost:9092 >>>> 2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received >>>> an >>>> invalid or empty offset None for [Follows,0]. Attempting to use Kafka's >>>> auto.offset.reset setting. This can result in data loss if processing >>>> continues. >>>> 2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset >>>> is >>>> defined for topic Follows >>>> 2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest. >>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for >>>> localhost:9092 >>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop. >>>> 2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key >>>> in >>>> rocksdb. >>>> 2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in >>>> process >>>> loop. >>>> org.rocksdb.RocksDBException: IO error: directory: Invalid argument >>>> at org.rocksdb.RocksDB.open(Native Method) >>>> at org.rocksdb.RocksDB.open(RocksDB.java:133) >>>> at >>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute( >>>> RocksDbKeyValueStore.scala:85) >>>> at >>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db( >>>> RocksDbKeyValueStore.scala:85) >>>> at >>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( >>>> RocksDbKeyValueStore.scala:92) >>>> at >>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get( >>>> RocksDbKeyValueStore.scala:80) >>>> at >>>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41) >>>> at >>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get( >>>> SerializedKeyValueStore.scala:36) >>>> at >>>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90) >>>> at >>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get( >>>> NullSafeKeyValueStore.scala:36) >>>> at >>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get( >>>> KeyValueStorageEngine.scala:44) >>>> at >>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged( >>>> EngagedUsersTask.scala:66) >>>> at >>>> me.doubledutch.analytics.task.EngagedUsersTask.process( >>>> EngagedUsersTask.scala:100) >>>> at >>>> org.apache.samza.container.TaskInstance$$anonfun$process$ >>>> 1.apply$mcV$sp(TaskInstance.scala:137) >>>> at >>>> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle( >>>> TaskInstanceExceptionHandler.scala:54) >>>> at >>>> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136) >>>> at >>>> org.apache.samza.container.RunLoop$$anonfun$process$2. >>>> apply(RunLoop.scala:93) >>>> at >>>> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37) >>>> at org.apache.samza.container.RunLoop.updateTimer(RunLoop. >>>> scala:36) >>>> at org.apache.samza.container.RunLoop.process(RunLoop.scala:79) >>>> at org.apache.samza.container.RunLoop.run(RunLoop.scala:65) >>>> at >>>> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556) >>>> at >>>> org.apache.samza.container.SamzaContainer$.safeMain( >>>> SamzaContainer.scala:108) >>>> at >>>> org.apache.samza.container.SamzaContainer$.main( >>>> SamzaContainer.scala:87) >>>> at >>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) >>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down. >>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer >>>> multiplexer. >>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for >>>> localhost:9092 >>>> 2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due >>>> to >>>> socket error: null >>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt >>>> exception in broker proxy thread. >>>> 2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to >>>> interrupt. >>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer >>>> multiplexer. >>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task >>>> instance >>>> stream tasks. >>>> 2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task >>>> instance >>>> stores. >>>> >>>> >>>> Same exception is thrown if I try to put a value in RocksDB. Has anyone >>>> run into this problem before or has any pointers into solving it? >>>> >>>> Lukas >>>> >>>> >>> >>> >> >