Hi, Gabor,
I'm curious about why this happened in Azure file and not in other file format(I tried use s3 and it works OK) Gabor Somogyi <gabor.g.somo...@gmail.com> 于2024年7月2日周二 16:59写道: > I see, thanks for sharing. > > The change what you've made makes sense. Let me explain the details. > Each and every plugin has it's own class loader. The reason behind that is > to avoid dependency collision with Flink's main class loader. > > I think if the mentioned change works when it's added as normal lib and > not as a plugin then the code can be merged to main as-is. > > G > > > On Thu, Jun 27, 2024 at 5:30 AM Xiao Xu <ffxrqy...@gmail.com> wrote: > >> Hi, Gabar, >> >> Thanks to reply, I make sure that not conflict in maven, all the hadoop >> dependency is in provided scope, >> and checked my result jar it not contains >> (src/main/resources/META-INF/services). >> >> This is my pom: >> >> <project xmlns="http://maven.apache.org/POM/4.0.0" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >> <modelVersion>4.0.0</modelVersion> >> >> <groupId>com.test.flink</groupId> >> <artifactId>flink-sync</artifactId> >> <version>1.0-SNAPSHOT</version> >> <packaging>jar</packaging> >> >> <name>Flink Quickstart Job</name> >> >> <properties> >> <maven.compiler.source>1.8</maven.compiler.source> >> <maven.compiler.target>1.8</maven.compiler.target> >> <flink.version>1.18.1</flink.version> >> <java.version>1.8</java.version> >> <scala.binary.version>2.12</scala.binary.version> >> <kafka.version>3.2.0</kafka.version> >> <hadoop.version>3.3.4</hadoop.version> >> <log4j.version>2.16.0</log4j.version> >> <shadeVersion>3.2.0</shadeVersion> >> </properties> >> >> <dependencies> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-java</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> <!-- >> https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-java</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> <!-- >> https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> <!-- >> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-files</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka</artifactId> >> <version>3.1.0-1.18</version> >> </dependency> >> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple --> >> <dependency> >> <groupId>org.apache.logging.log4j</groupId> >> <artifactId>log4j-slf4j-impl</artifactId> >> <version>${log4j.version}</version> >> <scope>runtime</scope> >> <exclusions> >> <exclusion> >> <artifactId>slf4j-api</artifactId> >> <groupId>org.slf4j</groupId> >> </exclusion> >> </exclusions> >> </dependency> >> <dependency> >> <groupId>org.apache.logging.log4j</groupId> >> <artifactId>log4j-api</artifactId> >> <version>${log4j.version}</version> >> <scope>runtime</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.logging.log4j</groupId> >> <artifactId>log4j-core</artifactId> >> <version>${log4j.version}</version> >> <scope>runtime</scope> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-azure-fs-hadoop</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> </dependencies> >> <build> >> <plugins> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-assembly-plugin</artifactId> >> <version>3.0.0</version> >> <configuration> >> <appendAssemblyId>false</appendAssemblyId> >> <descriptorRefs> >> <descriptorRef>jar-with-dependencies</descriptorRef> >> </descriptorRefs> >> </configuration> >> <executions> >> <execution> >> <id>make-assembly</id> >> <phase>package</phase> >> <goals> >> <goal>single</goal> >> </goals> >> </execution> >> </executions> >> </plugin> >> </plugins> >> </build> >> </project> >> >> >> And like my reply in stackoverflow, I found the hadoop-common file : >> https://github.com/apache/hadoop/blob/release-3.3.4-RC1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3374 >> do not load any filesystem, dig in ServiceLoader.load(FileSystem.class) >> source code, it looks like have different class loader make it not load >> any filesystem. >> I changed the ServiceLoader.load(FileSystem.class) to >> ServiceLoader.load(FileSystem.class, >> FileSystem.class.getClassLoader()) and replace the flink-fs-azure-hadoop >> plugin, it works now, >> So I'm not sure why it works >> >> Gabor Somogyi <gabor.g.somo...@gmail.com> 于2024年6月26日周三 16:52写道: >> >>> Hi Xiao, >>> >>> I'm not quite convinced that the azure plugin ruined your workload, I >>> would take a look at the dependency graph you've in the pom. >>> Adding multiple deps can conflict in terms of class loader services >>> (src/main/resources/META-INF/services). >>> >>> As an example you've 2 such dependencies where >>> org.apache.flink.core.fs.FileSystemFactory is in the jar. >>> Hadoop core contains "flie" and the other one something different. Let's >>> say you don't use service merge plugin in your >>> maven project. Such case Hadoop core `file` entry will be just >>> overwritten by the second one. >>> >>> Solution: Either avoid deps with conflicting services or add >>> ServicesResourceTransformer >>> to your maven project. >>> >>> G >>> >>> >>> On Wed, Jun 26, 2024 at 10:16 AM Xiao Xu <ffxrqy...@gmail.com> wrote: >>> >>>> Hi, all >>>> >>>> I try to use Flink to write Azure Blob Storage which called ADLS, I put >>>> the flink-azure-fs-hadoop jar in plugins directory and when I start my >>>> write job it shows: >>>> >>>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No >>>> FileSystem for scheme "file" >>>> at >>>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262) >>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.<init>(AbfsOutputStream.java:173) >>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580) >>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?] >>>> at >>>> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301) >>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?] >>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052) >>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?] >>>> >>>> I search the issue looks like this: >>>> https://stackoverflow.com/questions/77238642/apache-flink-azure-abfs-file-sink-error-streaming-unsupportedfilesystemexcep >>>> >>>> my env: >>>> Flink: 1.18.1 >>>> JDK: 1.8 >>>> >>>> Does anyone else have the same problem? >>>> >>>