Hi, Like the error mentions "The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory."
You can't add the plugins via code; you need to follow the instructions in the documentation. Best regards, Martijn On Wed, Nov 23, 2022 at 1:53 PM Mujahid Niaz <mujahid.niaz...@gmail.com> wrote: > Hi team, > Hope Everyone is doing good, > > We have an issue regarding writing checkpoints metadata to S3 using > pyflink datastream api. we are using Apache-Flink==1.16.0. We are able to > sink our Stream into s3 but when it comes to writing checkpoint data. we > are getting the following error. > We tried a path with *s3:// s3a://, s3p:// prefixes *but it failed with > all of them. > > *(Code is attached after the error message)* > > Looking forward to hearing from you. Thank you > > Traceback (most recent call last): > File "app.py", line 103, in <module> > real_time_data_analytics() > File "app.py", line 98, in real_time_data_analytics > env.execute('bot_detection_app_local2') > File > "/test/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py", > line 764, in execute > return > JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph)) > File "/test/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, > in __call__ > return_value = get_return_value( > File "/test/lib/python3.8/site-packages/pyflink/util/exceptions.py", line > 146, in deco > return f(*a, **kw) > File "/test/lib/python3.8/site-packages/py4j/protocol.py", line 326, in > get_return_value > raise Py4JJavaError( > py4j.protocol.Py4JJavaError: An error occurred while calling o49.execute. > : org.apache.flink.util.FlinkException: Failed to execute job > 'bot_detection_app_local2'. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2075) > at > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobInitializationException: Could not start > the JobMaster. > at > org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75) > at > java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) > at > java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479) > at > java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) > at > java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) > at > java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) > at > java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) > Caused by: org.apache.flink.runtime.client.JobInitializationException: Could > not start the JobMaster. > at > org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint > storage at checkpoint coordinator side. > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) > ... 3 more > Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create > checkpoint storage at checkpoint coordinator side. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:337) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:245) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:511) > at > org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:317) > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361) > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134) > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152) > at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369) > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) > ... 3 more > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation for scheme 's3p'. The scheme is > directly supported by Flink through the following plugin(s): > flink-s3-fs-presto. Please ensure that each plugin resides within its own > subfolder within the plugins directory. See > https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ > for more information. If you want to use a Hadoop file system for that > scheme, please add the scheme to the configuration > fs.allowed-fallback-filesystems. For a full list of supported file systems, > please see > https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/. > at > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.<init>(FsCheckpointStorageAccess.java:67) > at > org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage.createCheckpointStorage(FileSystemCheckpointStorage.java:323) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:330) > ... 18 more > > import json > from pyflink.common import Row > from pyflink.common.serialization import SimpleStringSchema, > DeserializationSchema, Encoder > from pyflink.common.typeinfo import Types, BasicType, TypeInformation, > BasicTypeInfo > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.checkpoint_storage import > FileSystemCheckpointStorage, CheckpointStorage, CustomCheckpointStorage > from pyflink.datastream.state_backend import FsStateBackend, > RocksDBStateBackend,EmbeddedRocksDBStateBackend > # import redis > from datetime import datetime > > from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy > from pyflink.datastream.connectors.kinesis import FlinkKinesisConsumer, > KinesisDeserializationSchema > > > # from flatten_json import flatten_json > > > # this function reads data from data stream as json object one by one and > returns new data object after adding new field. and will be used in > real_time_data_analytics function > > # redis_host = "redis" > # redis_port = 6379 > > > def map_events(obj): > print('*********************************************') > columns = obj.split('\t') > print(columns[0]) > print(obj) > return json.dumps(obj) > > > # json_obj = json.loads(obj) > # return json.dumps(json_obj) > > > # def get_value_from_redis(key): > # r = redis.StrictRedis(host=redis_host, port=redis_port, > decode_responses=True) > # data = r.get(key) > # json_obj = json.loads(data) > # return json_obj > > > def real_time_data_analytics(): > # 1. create a StreamExecutionEnvironment > # the sql connector for kafka is used here as it's a fat jar and could > avoid dependency issues > # env.add_jars("file:///app/f2.jar") > # 2. create source DataStream > deserialization_schema = SimpleStringSchema() > > # Initializing Kafka consumer so that we can read the data sent by > producer > > consumer_config = { > 'aws.region': 'us-central-1', > 'flink.stream.initpos': 'LATEST' > } > > env = StreamExecutionEnvironment.get_execution_environment() > env.add_jars("file:///app/f3.jar") > env.add_jars("file:///app/flink-s3-fs-hadoop-1.16.0.jar") > env.add_jars("file:///app/flink-s3-fs-presto-1.16.0.jar") > env.add_jars("file:///app/aws-java-sdk-core-1.12.347.jar") > env.add_jars("file:///app/flink-statebackend-rocksdb-1.16.0.jar") > env.add_jars("file:///app/hadoop-common-3.3.4.jar") > print(env.__dict__) > env.set_state_backend(EmbeddedRocksDBStateBackend()) > > env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("s3p://myBucket/bot_detection_test/checkpoints/")) > > env.enable_checkpointing(1000) > kinesis = env.add_source( > FlinkKinesisConsumer("mystream", SimpleStringSchema(), > consumer_config)) > kinesis_filtered = kinesis.map(map_events, Types.STRING()) > > sink = FileSink \ > .for_row_format("s3a://myBucket/bot_detection_test/", > Encoder.simple_string_encoder("UTF-8")) \ > > .with_rolling_policy(RollingPolicy.default_rolling_policy(part_size=1024*1024 > * 1, > > rollover_interval=5 * 60 * 1000, > > inactivity_interval=5 * 60 * 1000)) \ > .build() > > kinesis_filtered.sink_to(sink) > # kinesis_filtered.print() > env.execute('bot_detection_app_local2') > > > # running the above function in main > if __name__ == '__main__': > real_time_data_analytics() > > > -- > > Regards, > > *Mujahid Niaz* > > *LinkedIn: *https://www.linkedin.com/in/mujahidniaz/ > <https://www.linkedin.com/in/mujahid-niaz-30228673/> > *Portfolio: *http://mujahidniaz.github.io/ > <https://www.linkedin.com/in/mujahid-niaz-30228673/> >