Hey Lukas,

> This happens every time even if I spin up a new VM.

Ah I might have misunderstood. Are your VMs started with a fresh FS? You're
not using EBS or anything like that, are you?

I want to see if you're getting hit by that setErrorIfExists line. If you:

1. Stop your job.
2. Clear the state from the FS.
3. Start your job.

Does it work?

Cheers,
Chris

On Tue, Feb 17, 2015 at 12:07 PM, Chris Riccomini <criccom...@apache.org>
wrote:

> Hey Lukas,
>
> Could you try clearing out the state, and starting the job?
>
> Cheers,
> Chris
>
> On Tue, Feb 17, 2015 at 11:33 AM, Lukas Steiblys <lu...@doubledutch.me>
> wrote:
>
>> This happens every time even if I spin up a new VM. Happens after a
>> restart as well.
>>
>> Lukas
>>
>> -----Original Message----- From: Chris Riccomini
>> Sent: Tuesday, February 17, 2015 11:01 AM
>> To: dev@samza.apache.org
>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>
>> Hey Lukas,
>>
>> Interesting. Does this happen only after restarting your job? Or does it
>> happen the first time, as well? I'm wondering if this is the problem:
>>
>>    options.setErrorIfExists(true)
>>
>> In RocksDbKeyValueStore.scala. I think this is set under the assumption
>> that the job is run in YARN. If you run locally, it seems to me that the
>> directory would continue to exist after a job is restarted. If you delete
>> your state directory, and restart your job, does the problem temporarily
>> go
>> away until a subsequent restart happens?
>>
>> Cheers,
>> Chris
>>
>> On Tue, Feb 17, 2015 at 10:55 AM, Lukas Steiblys <lu...@doubledutch.me>
>> wrote:
>>
>>  Hi Chris,
>>>
>>> 1. We're running locally using ProcessJobFactory
>>> 2. CentOS 7 x86_64
>>> 3.
>>>    startup.log: https://gist.github.com/imbusy/0592a9c52a96fcce48db
>>>    engaged-users.log: https://gist.github.com/
>>> imbusy/0b3d264a40ddf34ab8e7
>>>    engaged-users.properties: https://gist.github.com/
>>> imbusy/d0019db29d7b68c60bfc
>>>
>>>    Also note that the properties file sets the default offset to oldest,
>>> but the log file says that it's setting the offset to largest:
>>> "2015-02-17
>>> 18:46:32 GetOffset [INFO] Got reset of type largest."
>>>
>>> 4. From the log file: "2015-02-17 18:45:57 SamzaContainer$ [INFO] Got
>>> storage engine base directory: /vagrant/SamzaJobs/deploy/samza/state"
>>>    I checked the directory and it actually exists:
>>>
>>> du -h /vagrant/SamzaJobs/deploy/samza/state
>>>
>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 0
>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 1
>>> 0    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 2
>>> 16K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store/Partition 3
>>> 36K    /vagrant/SamzaJobs/deploy/samza/state/engaged-store
>>> 36K    /vagrant/SamzaJobs/deploy/samza/state
>>>
>>> Lukas
>>>
>>> -----Original Message----- From: Chris Riccomini
>>> Sent: Monday, February 16, 2015 5:53 PM
>>> To: dev@samza.apache.org
>>> Subject: Re: RocksDBException: IO error: directory: Invalid argument
>>>
>>>
>>> Hey Lukas,
>>>
>>> It looks like the exception is actually thrown on get, not put:
>>>
>>>          at org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>> KeyValueStorageEngine.scala:44)
>>>
>>> 1. Are you running your job under YARN, or as a local job
>>> (ThreadJobFactory/ProcessJobFactory)?
>>> 2. What OS are you running on?
>>> 3. Could post a fully copy of your logs somewhere (github gist,
>>> pasteboard,
>>> or something)?
>>> 4.  Also, what does this line say in your logs:
>>>
>>>    info("Got storage engine base directory: %s" format storeBaseDir)
>>>
>>> It sounds like something is getting messed up with the directory where
>>> the
>>> RocksDB store is trying to keep its data.
>>>
>>> Cheers,
>>> Chris
>>>
>>> On Mon, Feb 16, 2015 at 3:50 PM, Lukas Steiblys <lu...@doubledutch.me>
>>> wrote:
>>>
>>>  Hello,
>>>
>>>>
>>>> I was setting up the key-value storage engine in Samza and ran into an
>>>> exception when querying the data.
>>>>
>>>> I added these properties to the config:
>>>>
>>>>
>>>> stores.engaged-store.factory=org.apache.samza.storage.kv.
>>>> RocksDbKeyValueStorageEngineFactory
>>>>     stores.engaged-store.changelog=kafka.engaged-store-changelog
>>>>     # a custom data type with an appropriate Serde
>>>>     stores.engaged-store.key.serde=UserAppPair
>>>>     # wrote a Serde for Long using ByteBuffer
>>>>     stores.engaged-store.msg.serde=Long
>>>>
>>>> I have no trouble initializing the storage engine with:
>>>>
>>>>     val store =
>>>> context.getStore("engaged-store").asInstanceOf[
>>>> KeyValueStore[UserAppPair,
>>>> Long]];
>>>>
>>>> but when I query by the key when processing messages, it’s throwing an
>>>> exception:
>>>>
>>>>     val key = new UserAppPair(userId, appId);
>>>>     val value = store.get(key);
>>>>
>>>> Here’s the log:
>>>>
>>>>     2015-02-16 23:30:18 BrokerProxy [INFO] Starting BrokerProxy for
>>>> localhost:9092
>>>>     2015-02-16 23:30:18 BrokerProxy [WARN] It appears that we received
>>>> an
>>>> invalid or empty offset None for [Follows,0]. Attempting to use Kafka's
>>>> auto.offset.reset setting. This can result in data loss if processing
>>>> continues.
>>>>     2015-02-16 23:30:18 GetOffset [INFO] Checking if auto.offset.reset
>>>> is
>>>> defined for topic Follows
>>>>     2015-02-16 23:30:18 GetOffset [INFO] Got reset of type largest.
>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Starting BrokerProxy for
>>>> localhost:9092
>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Entering run loop.
>>>>     2015-02-16 23:30:23 EngagedUsersTask [INFO] about to query for key
>>>> in
>>>> rocksdb.
>>>>     2015-02-16 23:30:23 SamzaContainer [ERROR] Caught exception in
>>>> process
>>>> loop.
>>>>     org.rocksdb.RocksDBException: IO error: directory: Invalid argument
>>>>         at org.rocksdb.RocksDB.open(Native Method)
>>>>         at org.rocksdb.RocksDB.open(RocksDB.java:133)
>>>>         at
>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db$lzycompute(
>>>> RocksDbKeyValueStore.scala:85)
>>>>         at
>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.db(
>>>> RocksDbKeyValueStore.scala:85)
>>>>         at
>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>> RocksDbKeyValueStore.scala:92)
>>>>         at
>>>> org.apache.samza.storage.kv.RocksDbKeyValueStore.get(
>>>> RocksDbKeyValueStore.scala:80)
>>>>         at
>>>> org.apache.samza.storage.kv.LoggedStore.get(LoggedStore.scala:41)
>>>>         at
>>>> org.apache.samza.storage.kv.SerializedKeyValueStore.get(
>>>> SerializedKeyValueStore.scala:36)
>>>>         at
>>>> org.apache.samza.storage.kv.CachedStore.get(CachedStore.scala:90)
>>>>         at
>>>> org.apache.samza.storage.kv.NullSafeKeyValueStore.get(
>>>> NullSafeKeyValueStore.scala:36)
>>>>         at
>>>> org.apache.samza.storage.kv.KeyValueStorageEngine.get(
>>>> KeyValueStorageEngine.scala:44)
>>>>         at
>>>> me.doubledutch.analytics.task.EngagedUsersTask.engaged(
>>>> EngagedUsersTask.scala:66)
>>>>         at
>>>> me.doubledutch.analytics.task.EngagedUsersTask.process(
>>>> EngagedUsersTask.scala:100)
>>>>         at
>>>> org.apache.samza.container.TaskInstance$$anonfun$process$
>>>> 1.apply$mcV$sp(TaskInstance.scala:137)
>>>>         at
>>>> org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(
>>>> TaskInstanceExceptionHandler.scala:54)
>>>>         at
>>>> org.apache.samza.container.TaskInstance.process(TaskInstance.scala:136)
>>>>         at
>>>> org.apache.samza.container.RunLoop$$anonfun$process$2.
>>>> apply(RunLoop.scala:93)
>>>>         at
>>>> org.apache.samza.util.TimerUtils$class.updateTimer(TimerUtils.scala:37)
>>>>         at org.apache.samza.container.RunLoop.updateTimer(RunLoop.
>>>> scala:36)
>>>>         at org.apache.samza.container.RunLoop.process(RunLoop.scala:79)
>>>>         at org.apache.samza.container.RunLoop.run(RunLoop.scala:65)
>>>>         at
>>>> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:556)
>>>>         at
>>>> org.apache.samza.container.SamzaContainer$.safeMain(
>>>> SamzaContainer.scala:108)
>>>>         at
>>>> org.apache.samza.container.SamzaContainer$.main(
>>>> SamzaContainer.scala:87)
>>>>         at
>>>> org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down.
>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down consumer
>>>> multiplexer.
>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down BrokerProxy for
>>>> localhost:9092
>>>>     2015-02-16 23:30:23 DefaultFetchSimpleConsumer [WARN] Reconnect due
>>>> to
>>>> socket error: null
>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Got closed by interrupt
>>>> exception in broker proxy thread.
>>>>     2015-02-16 23:30:23 BrokerProxy [INFO] Shutting down due to
>>>> interrupt.
>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down producer
>>>> multiplexer.
>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>> instance
>>>> stream tasks.
>>>>     2015-02-16 23:30:23 SamzaContainer [INFO] Shutting down task
>>>> instance
>>>> stores.
>>>>
>>>>
>>>> Same exception is thrown if I try to put a value in RocksDB. Has anyone
>>>> run into this problem before or has any pointers into solving it?
>>>>
>>>> Lukas
>>>>
>>>>
>>>
>>>
>>
>

Reply via email to