Hi,
I am running flink 1.10.1 initially on my local development machine - Macbook
Pro. I'm struggling to understand how to write to Google Cloud storage using
the StreamingfileSink (S3 works fine).
There error I am seeing:
"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find
a file system implementation for scheme 'gs'. The scheme is not directly
supported by Flink and no Hadoop file system to support this scheme could be
loaded.
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:117)"
I have put the gcs-connector-hadoop2-latest.jar in a subdir in plugins/
plugins
├── gcs-connector
│ └── gcs-connector-hadoop2-latest.jar
In flink-yaml.conf I have added:
fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
google.cloud.auth.service.account.enable: true
google.cloud.auth.service.account.json.keyfile: ~/key.json
This mirrors the setup I used for s3 storage.
My implementation is a simple test reading data from a kinesis stream and
outputing to gcp.
DataStream<String> input = getKinesisSource(env, kinesisStream);
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("gs://some-gcp-bucket"), new
SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(2))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
//input.print();
input.addSink(sink);
Not sure what else to try. Any pointers appreciated.
Sent with [ProtonMail](https://protonmail.com) Secure Email.