Hi Andrea

    Actually It’s caused by Flink’s ClassLoader. It’s because flink use parent 
Classloader to load jar first and then you use it in your user’s code, then 
user-code classloader will load it again so it raised the error. There are two 
solutions.
    1.  Add scope “provided” to maven pom.xml
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
            <version>${flink_version}</version>
            <scope>provided</scope>
        </dependency>
    2. Set this classloader.resolve-order: parent-first in flink-conf.yml


Hope this will help you.


Thanks,
Simon
On 06/24/2019 11:27,Yun Tang<myas...@live.com> wrote:
Hi Andrea


Since I have not written Scala for a while, I wonder why you need to 
instantiate your ColumnFamilyOptions, BlockBasedTableConfig and DBOptions on JM 
side. As far as I can see, you could instantiate your on your TM side like code:


val rocksdbConfig = new OptionsFactory() {
override def createDBOptions(currentOptions: DBOptions): DBOptions = 
                 currentOptions.setIncreaseParallelism(properties.threadNo)

      override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
ColumnFamilyOptions = 
                 
currentOptions.setWriteBufferSize(MemorySize.parseBytes(properties.writeBufferSize))
}

You just need to serialize theproperties via closure to TMs. Hope this could 
help you.

Best
Yun Tang

From: Andrea Spina <andrea.sp...@radicalbit.io>
Sent: Monday, June 24, 2019 2:20
To: user
Subject: Linkage Error RocksDB and flink-1.6.4
 
Dear community,
I am running a Flink Job backed by RocksDB, version 1.6.4 and scala 2.11. At 
the job Startp the following exception happens (it's recorded by the Job 
Manager).

Caused by: java.lang.LinkageError: loader constraint violation: loader 
(instance of 
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders$ChildFirstClassLoader)
 previously initiated loading for a different type with name 
"org/rocksdb/DBOptions"
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
        at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:126)



For this job, I programmatically set some RocksDB options by using the code 
appended below. Anybody can help with this? Thank you so much,
Andrea


import org.apache.flink.configuration.MemorySize
import org.apache.flink.contrib.streaming.state.{OptionsFactory, 
PredefinedOptions, RocksDBStateBackend}
import org.rocksdb.{BlockBasedTableConfig, ColumnFamilyOptions, DBOptions}
object ConfigurableRocksDB {

lazy val columnOptions = new ColumnFamilyOptions() with Serializable
lazy val tableConfig   = new BlockBasedTableConfig() with Serializable
lazy val dbOptions     = new DBOptions() with Serializable

def configureStateBackendRocksDB(properties: FlinkDeployment): 
RocksDBStateBackend = {
    properties.threadNo.foreach(dbOptions.setIncreaseParallelism)

    properties.blockSize.foreach(bs => 
tableConfig.setBlockSize(MemorySize.parseBytes(bs)))
    properties.cacheSize.foreach(cs => 
tableConfig.setBlockCacheSize(MemorySize.parseBytes(cs)))
    properties.cacheIndicesAndFilters.foreach(cif => if (cif) 
tableConfig.cacheIndexAndFilterBlocks())
    properties.writeBufferSize.foreach(wbs => 
columnOptions.setWriteBufferSize(MemorySize.parseBytes(wbs)))

columnOptions.setTableFormatConfig(tableConfig)
    properties.writeBufferToMerge.foreach(bm => 
columnOptions.setMinWriteBufferNumberToMerge(bm))
    properties.writeBufferCount.foreach(bc => 
columnOptions.setMaxWriteBufferNumber(bc))
    properties.optimizeFilterForHits.foreach(op => if (op) 
columnOptions.optimizeFiltersForHits())

val rocksdbConfig = new OptionsFactory() {
override def createDBOptions(currentOptions: DBOptions): DBOptions              
           = dbOptions
override def createColumnOptions(currentOptions: ColumnFamilyOptions): 
ColumnFamilyOptions = columnOptions
}

val stateBE =
new RocksDBStateBackend(properties.checkpointDir.get, 
properties.checkpointIncremental.getOrElse(false))
    stateBE.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
    stateBE.setOptions(rocksdbConfig)

    stateBE
  }

}
--

Andrea Spina
Head of R&D @ Radicalbit Srl
Via Giovanni Battista Pirelli 11, 20124, Milano - IT

Reply via email to