Hi everyone,
I'm using PyFlink to communicate with HDFS and trying to set the checkpoint
storage to an HDFS path. However, I encountered the following error:
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem
for scheme "hdfs"
I'm using Flink 1.17.2 and Hadoop 3.2.1
Here's my Python code(which workd fine in Java):
from pyflink.common import Configuration, SimpleStringSchema, WatermarkStrategy
from pyflink.datastream import CheckpointingMode, StreamExecutionEnvironment,
CheckpointStorage, \ ExternalizedCheckpointCleanup from
pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
if __name__ == '__main__': # import os # # os.environ["HADOOP_USER_NAME"] =
"hadoop" conf = Configuration() conf.set_integer("rest.port", 10010) env =
StreamExecutionEnvironment.get_execution_environment(conf)
env.set_parallelism(1) env.enable_checkpointing(5000,
CheckpointingMode.EXACTLY_ONCE) env.add_jars(
f"file:///D:/codes/cpo-py/libs/flink-sql-connector-kafka-1.17.2.jar",
f"file:///D:/codes/cpo-py/libs/hadoop-client-3.2.1.jar",
f"file:///D:/codes/cpo-py/libs/hadoop-common-3.2.1.jar",
f"file:///D:/codes/cpo-py/libs/hadoop-hdfs-3.2.1.jar", ) checkpoint_config =
env.get_checkpoint_config()
checkpoint_config.set_checkpoint_storage(CheckpointStorage("hdfs://node1:8020/cpo/stream/cpo_dc"))
checkpoint_config.set_max_concurrent_checkpoints(1)
checkpoint_config.set_min_pause_between_checkpoints(5000)
checkpoint_config.set_checkpoint_timeout(10000)
checkpoint_config.set_externalized_checkpoint_cleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
kafka_source = KafkaSource.builder() \ .set_bootstrap_servers("node1:9092") \
.set_topics("cpo_dc") \ .set_group_id("cpo_dc") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \ .build() source =
env.from_source(kafka_source, WatermarkStrategy.no_watermarks(),
"kafka_source") source.print()
env.execute()
The equivalent Java code runs without issues:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema; import
org.apache.flink.configuration.Configuration; import
org.apache.flink.connector.kafka.source.KafkaSource; import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode; import
org.apache.flink.streaming.api.datastream.DataStreamSource; import
org.apache.flink.streaming.api.environment.CheckpointConfig; import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public
class Main { public static void main(String[] args) throws Exception {
//System.setProperty("HADOOP_USER_NAME", "hadoop"); Configuration conf = new
Configuration(); conf.setInteger("rest.port", 10010);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf) .setParallelism(1)
.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("hdfs://node1:8020/cpo/stream/cpo_dc");
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
env.getCheckpointConfig().setCheckpointTimeout(10000);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("node1:9092") .setGroupId("cpo_dc") .setTopics("cpo_dc")
.setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new
SimpleStringSchema()) .build(); DataStreamSource<String> source =
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");
source.print(); env.execute(); }
}
Is there something I need to configure differently in PyFlink to enable HDFS
checkpoint storage?
Any help or guidance would be greatly appreciated!
Best regards!