Hi,

I am running flink 1.10.1 initially on my local development machine - Macbook 
Pro. I'm struggling to understand how to write to Google Cloud storage using 
the StreamingfileSink (S3 works fine).

There error I am seeing:

"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find 
a file system implementation for scheme 'gs'. The scheme is not directly 
supported by Flink and no Hadoop file system to support this scheme could be 
loaded.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:117)"

I have put the gcs-connector-hadoop2-latest.jar in a subdir in plugins/

plugins
├── gcs-connector
│ └── gcs-connector-hadoop2-latest.jar

In flink-yaml.conf I have added:

fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
google.cloud.auth.service.account.enable: true
google.cloud.auth.service.account.json.keyfile: ~/key.json

This mirrors the setup I used for s3 storage.

My implementation is a simple test reading data from a kinesis stream and 
outputing to gcp.

DataStream<String> input = getKinesisSource(env, kinesisStream);

final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("gs://some-gcp-bucket"), new 
SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(2))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();

//input.print();
input.addSink(sink);

Not sure what else to try. Any pointers appreciated.

Sent with [ProtonMail](https://protonmail.com) Secure Email.

Reply via email to