Hi,

Like the error mentions "The scheme is directly supported by Flink through
the following plugin(s): flink-s3-fs-presto. Please ensure that each plugin
resides within its own subfolder within the plugins directory."

You can't add the plugins via code; you need to follow the instructions in
the documentation.

Best regards,

Martijn

On Wed, Nov 23, 2022 at 1:53 PM Mujahid Niaz <mujahid.niaz...@gmail.com>
wrote:

> Hi team,
> Hope Everyone is doing good,
>
> We have an issue regarding writing checkpoints metadata to S3 using
> pyflink datastream api. we are using Apache-Flink==1.16.0. We are able to
> sink our Stream into s3 but when it comes to writing checkpoint data. we
> are getting the following error.
> We tried a path with *s3:// s3a://, s3p:// prefixes *but it failed with
> all of them.
>
> *(Code is attached after the error message)*
>
> Looking forward to hearing from you. Thank you
>
> Traceback (most recent call last):
>   File "app.py", line 103, in <module>
>     real_time_data_analytics()
>   File "app.py", line 98, in real_time_data_analytics
>     env.execute('bot_detection_app_local2')
>   File 
> "/test/lib/python3.8/site-packages/pyflink/datastream/stream_execution_environment.py",
>  line 764, in execute
>     return 
> JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
>   File "/test/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, 
> in __call__
>     return_value = get_return_value(
>   File "/test/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 
> 146, in deco
>     return f(*a, **kw)
>   File "/test/lib/python3.8/site-packages/py4j/protocol.py", line 326, in 
> get_return_value
>     raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o49.execute.
> : org.apache.flink.util.FlinkException: Failed to execute job 
> 'bot_detection_app_local2'.
>         at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
>         at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2075)
>         at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>         at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>         at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>         at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobInitializationException: Could not start 
> the JobMaster.
>         at 
> org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>         at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
>         at 
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
>         at 
> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:479)
>         at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
>         at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>         at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
>         at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
>         at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not start the JobMaster.
>         at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>         at 
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>         at 
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>         at 
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>         at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint 
> storage at checkpoint coordinator side.
>         at 
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>         at 
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>         at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>         ... 3 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create 
> checkpoint storage at checkpoint coordinator side.
>         at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:337)
>         at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:245)
>         at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.enableCheckpointing(DefaultExecutionGraph.java:511)
>         at 
> org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:317)
>         at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
>         at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
>         at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
>         at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
>         at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
>         at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
>         at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
>         at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
>         at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>         at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>         at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>         ... 3 more
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: 
> Could not find a file system implementation for scheme 's3p'. The scheme is 
> directly supported by Flink through the following plugin(s): 
> flink-s3-fs-presto. Please ensure that each plugin resides within its own 
> subfolder within the plugins directory. See 
> https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/
>  for more information. If you want to use a Hadoop file system for that 
> scheme, please add the scheme to the configuration 
> fs.allowed-fallback-filesystems. For a full list of supported file systems, 
> please see 
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
>         at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
>         at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
>         at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
>         at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess.<init>(FsCheckpointStorageAccess.java:67)
>         at 
> org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage.createCheckpointStorage(FileSystemCheckpointStorage.java:323)
>         at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:330)
>         ... 18 more
>
> import json
> from pyflink.common import Row
> from pyflink.common.serialization import SimpleStringSchema, 
> DeserializationSchema, Encoder
> from pyflink.common.typeinfo import Types, BasicType, TypeInformation, 
> BasicTypeInfo
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.checkpoint_storage import 
> FileSystemCheckpointStorage, CheckpointStorage, CustomCheckpointStorage
> from pyflink.datastream.state_backend import FsStateBackend, 
> RocksDBStateBackend,EmbeddedRocksDBStateBackend
> # import redis
> from datetime import datetime
>
> from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy
> from pyflink.datastream.connectors.kinesis import FlinkKinesisConsumer, 
> KinesisDeserializationSchema
>
>
> # from flatten_json import flatten_json
>
>
> # this function reads data from data stream as json object one by one and 
> returns new data object after adding new field. and will be used in 
> real_time_data_analytics function
>
> # redis_host = "redis"
> # redis_port = 6379
>
>
> def map_events(obj):
>     print('*********************************************')
>     columns = obj.split('\t')
>     print(columns[0])
>     print(obj)
>     return json.dumps(obj)
>
>
> # json_obj = json.loads(obj)
> # return json.dumps(json_obj)
>
>
> # def get_value_from_redis(key):
> #     r = redis.StrictRedis(host=redis_host, port=redis_port, 
> decode_responses=True)
> #     data = r.get(key)
> #     json_obj = json.loads(data)
> #     return json_obj
>
>
> def real_time_data_analytics():
>     # 1. create a StreamExecutionEnvironment
>     # the sql connector for kafka is used here as it's a fat jar and could 
> avoid dependency issues
>     # env.add_jars("file:///app/f2.jar")
>     # 2. create source DataStream
>     deserialization_schema = SimpleStringSchema()
>
>     # Initializing Kafka consumer so that we can read the data sent by 
> producer
>
>     consumer_config = {
>         'aws.region': 'us-central-1',
>         'flink.stream.initpos': 'LATEST'
>     }
>
>     env = StreamExecutionEnvironment.get_execution_environment()
>     env.add_jars("file:///app/f3.jar")
>     env.add_jars("file:///app/flink-s3-fs-hadoop-1.16.0.jar")
>     env.add_jars("file:///app/flink-s3-fs-presto-1.16.0.jar")
>     env.add_jars("file:///app/aws-java-sdk-core-1.12.347.jar")
>     env.add_jars("file:///app/flink-statebackend-rocksdb-1.16.0.jar")
>     env.add_jars("file:///app/hadoop-common-3.3.4.jar")
>     print(env.__dict__)
>     env.set_state_backend(EmbeddedRocksDBStateBackend())
>     
> env.get_checkpoint_config().set_checkpoint_storage(FileSystemCheckpointStorage("s3p://myBucket/bot_detection_test/checkpoints/"))
>
>     env.enable_checkpointing(1000)
>     kinesis = env.add_source(
>         FlinkKinesisConsumer("mystream", SimpleStringSchema(), 
> consumer_config))
>     kinesis_filtered = kinesis.map(map_events, Types.STRING())
>
>     sink = FileSink \
>         .for_row_format("s3a://myBucket/bot_detection_test/", 
> Encoder.simple_string_encoder("UTF-8")) \
>         
> .with_rolling_policy(RollingPolicy.default_rolling_policy(part_size=1024*1024 
> * 1,
>                                                                   
> rollover_interval=5 * 60 * 1000,
>                                                                   
> inactivity_interval=5 * 60 * 1000)) \
>         .build()
>
>     kinesis_filtered.sink_to(sink)
>     # kinesis_filtered.print()
>     env.execute('bot_detection_app_local2')
>
>
> # running the above function in main
> if __name__ == '__main__':
>     real_time_data_analytics()
>
>
> --
>
> Regards,
>
> *Mujahid Niaz*
>
> *LinkedIn: *https://www.linkedin.com/in/mujahidniaz/
> <https://www.linkedin.com/in/mujahid-niaz-30228673/>
> *Portfolio: *http://mujahidniaz.github.io/
> <https://www.linkedin.com/in/mujahid-niaz-30228673/>
>

Reply via email to