Hi,

I have never seen this before. I would assume to see this exception because the 
write batch is flushed and contained a write against a column family that does 
not exist (anymore). However, we initialize everything relevant in 
RocksDBCachingPriorityQueueSet as final (CF handle) and never drop any column 
families or exchange db instances that are used with the writebatch, except 
after timer service and writebatch are already closed, in dispose(). Would be 
nice if they had added the name of the missing CF to the exception. The last 
remove is not necessarily the culprit, is is just what happened to trigger the 
flush, but it could be the culprit because any batched op could be. If you 
observe it near checkpoints and watermarks, that is not surprising because 
those are two points where flushes are likely to happen. Do you have any custom 
modifications that can drop column families. Because I cannot see where a CF 
could get lost in the vanilla Flink code. Is there any other particular 
circumstance around this happening, e.g. like first flush after a restore or 
something like that?

Best,
Stefan

> On 15. Jan 2019, at 09:48, Gyula Fóra <gyula.f...@gmail.com> wrote:
> 
> Hi!
> 
> Lately I seem to be hitting a bug in the rocksdb timer service. This happens 
> mostly at checkpoints but sometimes even at watermark:
> 
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:330)
>       at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>       at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:220)
>       at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.rocksdb.RocksDBException: Invalid column family specified in write batch
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:333)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:166)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:56)
>       at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.poll(KeyGroupPartitionedPriorityQueue.java:97)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:249)
>       at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
>       at 
> com.king.rbea.backend.operators.scriptexecution.RbeaOperator.processWatermark(RbeaOperator.java:193)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark1(AbstractStreamOperator.java:793)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:327)
>       ... 7 more
> Caused by: org.rocksdb.RocksDBException: Invalid column family specified in 
> write batch
>       at org.rocksdb.RocksDB.write0(Native Method)
>       at org.rocksdb.RocksDB.write(RocksDB.java:602)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.flush(RocksDBWriteBatchWrapper.java:95)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.remove(RocksDBWriteBatchWrapper.java:89)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:331)
> 
> Has anyone seen this yet?
> Dont remember seeing this before 1.7
> 
> Gyula

Reply via email to