Dear community, I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At the job Startp the following exception happens (it's recorded by the Job Manager).
*Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) previously initiated loading for a different type with name "org/rocksdb/DBOptions" at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)* For this job, I programmatically set some RocksDB options by using the code appended below. Anybody can help with this? Thank you so much, Andrea import org.apache.flink.configuration.MemorySize import org.apache.flink.contrib.streaming.state.{OptionsFactory, PredefinedOptions, RocksDBStateBackend} import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions} object ConfigurableRocksDB { lazy val columnOptions = new ColumnFamilyOptions() with Serializable lazy val tableConfig = new BlockBasedTableConfig() with Serializable lazy val dbOptions = new DBOptions() with Serializable def configureStateBackendRocksDB(properties: FlinkDeployment): RocksDBStateBackend = { properties.threadNo.foreach(dbOptions.setIncreaseParallelism) properties.blockSize.foreach(bs => tableConfig.setBlockSize(MemorySize.parseBytes(bs))) properties.cacheSize.foreach(cs => tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs))) properties.cacheIndicesAndFilters.foreach(cif => if (cif) tableConfig.cacheIndexAndFilterBlocks()) properties.writeBufferSize.foreach(wbs => columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs))) columnOptions.setTableFormatConfig(tableConfig) properties.writeBufferToMerge.foreach(bm => columnOptions.setMinWriteBufferNumberToMerge(bm)) properties.writeBufferCount.foreach(bc => columnOptions.setMaxWriteBufferNumber(bc)) properties.optimizeFilterForHits.foreach(op => if (op) columnOptions.optimizeFiltersForHits()) val rocksdbConfig = new OptionsFactory() { override def createDBOptions(currentOptions: DBOptions): DBOptions = dbOptions override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = columnOptions } val stateBE = new RocksDBStateBackend(properties.checkpointDir.get, properties.checkpointIncremental.getOrElse(false)) stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED) stateBE.setOptions(rocksdbConfig) stateBE } } -- *Andrea Spina* Head of R&D @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT