Hi Andrea
Actually It’s caused by Flink’s ClassLoader. It’s because flink use parent Classloader to load jar first and then you use it in your user’s code, then user-code classloader will load it again so it raised the error. There are two solutions. 1. Add scope “provided” to maven pom.xml <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink_version}</version> <scope>provided</scope> </dependency> 2. Set this classloader.resolve-order: parent-first in flink-conf.yml Hope this will help you. Thanks, Simon On 06/24/2019 11:27,Yun Tang<myas...@live.com> wrote: Hi Andrea Since I have not written Scala for a while, I wonder why you need to instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions on JM side. As far as I can see, you could instantiate your on your TM side like code: val rocksdbConfig = new OptionsFactory() { override def createDBOptions(currentOptions: DBOptions): DBOptions = currentOptions.setIncreaseParallelism(properties.threadNo) override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize)) } You just need to serialize theproperties via closure to TMs. Hope this could help you. Best Yun Tang From: Andrea Spina <andrea.sp...@radicalbit.io> Sent: Monday, June 24, 2019 2:20 To: user Subject: Linkage Error RocksDB and flink-1.6.4 Dear community, I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At the job Startp the following exception happens (it's recorded by the Job Manager). Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader) previously initiated loading for a different type with name "org/rocksdb/DBOptions" at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) at java.net.URLClassLoader.access$100(URLClassLoader.java:74) at java.net.URLClassLoader$1.run(URLClassLoader.java:369) at java.net.URLClassLoader$1.run(URLClassLoader.java:363) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:362) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126) For this job, I programmatically set some RocksDB options by using the code appended below. Anybody can help with this? Thank you so much, Andrea import org.apache.flink.configuration.MemorySize import org.apache.flink.contrib.streaming.state.{OptionsFactory, PredefinedOptions, RocksDBStateBackend} import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions} object ConfigurableRocksDB { lazy val columnOptions = new ColumnFamilyOptions() with Serializable lazy val tableConfig = new BlockBasedTableConfig() with Serializable lazy val dbOptions = new DBOptions() with Serializable def configureStateBackendRocksDB(properties: FlinkDeployment): RocksDBStateBackend = { properties.threadNo.foreach(dbOptions.setIncreaseParallelism) properties.blockSize.foreach(bs => tableConfig.setBlockSize(MemorySize.parseBytes(bs))) properties.cacheSize.foreach(cs => tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs))) properties.cacheIndicesAndFilters.foreach(cif => if (cif) tableConfig.cacheIndexAndFilterBlocks()) properties.writeBufferSize.foreach(wbs => columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs))) columnOptions.setTableFormatConfig(tableConfig) properties.writeBufferToMerge.foreach(bm => columnOptions.setMinWriteBufferNumberToMerge(bm)) properties.writeBufferCount.foreach(bc => columnOptions.setMaxWriteBufferNumber(bc)) properties.optimizeFilterForHits.foreach(op => if (op) columnOptions.optimizeFiltersForHits()) val rocksdbConfig = new OptionsFactory() { override def createDBOptions(currentOptions: DBOptions): DBOptions = dbOptions override def createColumnOptions(currentOptions: ColumnFamilyOptions): ColumnFamilyOptions = columnOptions } val stateBE = new RocksDBStateBackend(properties.checkpointDir.get, properties.checkpointIncremental.getOrElse(false)) stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED) stateBE.setOptions(rocksdbConfig) stateBE } } -- Andrea Spina Head of R&D @ Radicalbit Srl Via Giovanni Battista Pirelli 11, 20124, Milano - IT