Now I am able to write checkpoints but cannot restore from it: java.lang.NoClassDefFoundError: com/google/cloud/hadoop/gcsio/GoogleCloudStorageImpl$6 at com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl.open(GoogleCloudStorageImpl.java:666) at com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem.open(GoogleCloudStorageFileSystem.java:323) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.<init>(GoogleHadoopFSInputStream.java:136) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.open(GoogleHadoopFileSystemBase.java:1102) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119) at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:80) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1005) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:974) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748)
My current setup: <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>gcs-connector</artifactId> <version>hadoop2-1.9.5</version> </dependency> On Thu, Aug 16, 2018 at 11:55 AM, Oleksandr Serdiukov <d...@serdukoff.me> wrote: > Hello All! > > I am trying to configure checkpoints for flink jobs in GCS. > Unfortunately, it fails after submitting a job. I run it using > docker-compose on my local machine. > > Any thoughts of it? > Thanks! > > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation for scheme 'gs'. The scheme is > not directly supported by Flink and no Hadoop file system to support this > scheme could be loaded. > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > FileSystem.java:405) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) > at org.apache.flink.runtime.state.filesystem. > FsCheckpointStorage.<init>(FsCheckpointStorage.java:61) > at org.apache.flink.runtime.state.filesystem.FsStateBackend. > createCheckpointStorage(FsStateBackend.java:441) > at org.apache.flink.contrib.streaming.state.RocksDBStateBackend. > createCheckpointStorage(RocksDBStateBackend.java:379) > at org.apache.flink.runtime.checkpoint. > CheckpointCoordinator.<init>(CheckpointCoordinator.java:247) > ... 33 more > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Hadoop is not in the classpath/dependencies. > at org.apache.flink.core.fs.UnsupportedSchemeFactory.create( > UnsupportedSchemeFactory.java:64) > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem( > FileSystem.java:401) > > > Env configuration is like this: > > StreamExecutionEnvironment env = applicationContext.getBean( > StreamExecutionEnvironment.class); > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > checkpointConfig.setFailOnCheckpointingErrors(false); > checkpointConfig.setCheckpointInterval(10000); > checkpointConfig.setMinPauseBetweenCheckpoints(5000); > checkpointConfig.setMaxConcurrentCheckpoints(1); > checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ > ONCE); > RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend( > String.format("gs://checkpoints/%s", > jobClass.getSimpleName()), true); > env.setStateBackend((StateBackend) rocksDBStateBackend); > > > Here is my `core-site.xml` file: > > <configuration> > <property> > <name>google.cloud.auth.service.account.enable</name> > <value>true</value> > </property> > <property> > <name>google.cloud.auth.service.account.json.keyfile</name> > <value>${user.dir}/key.json</value> > </property> > <property> > <name>fs.gs.impl</name> > <value>com.google.cloud.hadoop.fs.gcs. > GoogleHadoopFileSystem</value> > <description>The FileSystem for gs: (GCS) uris.</description> > </property> > <property> > <name>fs.AbstractFileSystem.gs.impl</name> > <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value> > <description>The AbstractFileSystem for gs: (GCS) > uris.</description> > </property> > <property> > <name>fs.gs.application.name.suffix</name> > <value>-kube-flink</value> > <description> > Appended to the user-agent header for API requests to GCS to > help identify > the traffic as coming from Dataproc. > </description> > </property> > </configuration> > > Dependency to gcs-connector and Hadoop: > > <dependency> > <groupId>com.google.cloud.bigdataoss</groupId> > <artifactId>gcs-connector</artifactId> > <version>1.9.4-hadoop2</version> > </dependency> > <dependency> > <groupId>org.apache.hadoop</groupId> > <artifactId>hadoop-common</artifactId> > <version>2.9.1</version> > </dependency> > >