Hi, Gabor,

I'm curious about why this happened in Azure file and not in other file
format(I tried use s3 and it works OK)

Gabor Somogyi <gabor.g.somo...@gmail.com> 于2024年7月2日周二 16:59写道:

> I see, thanks for sharing.
>
> The change what you've made makes sense. Let me explain the details.
> Each and every plugin has it's own class loader. The reason behind that is
> to avoid dependency collision with Flink's main class loader.
>
> I think if the mentioned change works when it's added as normal lib and
> not as a plugin then the code can be merged to main as-is.
>
> G
>
>
> On Thu, Jun 27, 2024 at 5:30 AM Xiao Xu <ffxrqy...@gmail.com> wrote:
>
>> Hi, Gabar,
>>
>> Thanks to reply, I make sure that not conflict in maven, all the hadoop
>> dependency is in provided scope,
>> and checked my result jar it not contains
>> (src/main/resources/META-INF/services).
>>
>> This is my pom:
>>
>> <project xmlns="http://maven.apache.org/POM/4.0.0"; 
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>     <modelVersion>4.0.0</modelVersion>
>>
>>     <groupId>com.test.flink</groupId>
>>     <artifactId>flink-sync</artifactId>
>>     <version>1.0-SNAPSHOT</version>
>>     <packaging>jar</packaging>
>>
>>     <name>Flink Quickstart Job</name>
>>
>>     <properties>
>>           <maven.compiler.source>1.8</maven.compiler.source>
>>           <maven.compiler.target>1.8</maven.compiler.target>
>>           <flink.version>1.18.1</flink.version>
>>           <java.version>1.8</java.version>
>>           <scala.binary.version>2.12</scala.binary.version>
>>           <kafka.version>3.2.0</kafka.version>
>>        <hadoop.version>3.3.4</hadoop.version>
>>        <log4j.version>2.16.0</log4j.version>
>>        <shadeVersion>3.2.0</shadeVersion>
>>     </properties>
>>
>>     <dependencies>
>>        <dependency>
>>           <groupId>org.apache.flink</groupId>
>>           <artifactId>flink-java</artifactId>
>>           <version>${flink.version}</version>
>>           <scope>provided</scope>
>>        </dependency>
>>        <!-- 
>> https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
>>        <dependency>
>>           <groupId>org.apache.flink</groupId>
>>           <artifactId>flink-streaming-java</artifactId>
>>           <version>${flink.version}</version>
>>           <scope>provided</scope>
>>        </dependency>
>>        <!-- 
>> https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
>>        <dependency>
>>           <groupId>org.apache.flink</groupId>
>>           <artifactId>flink-clients</artifactId>
>>           <version>${flink.version}</version>
>>           <scope>provided</scope>
>>        </dependency>
>>        <!-- 
>> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
>>        <dependency>
>>           <groupId>org.apache.flink</groupId>
>>           <artifactId>flink-connector-files</artifactId>
>>           <version>${flink.version}</version>
>>        </dependency>
>>        <dependency>
>>           <groupId>org.apache.flink</groupId>
>>           <artifactId>flink-connector-kafka</artifactId>
>>           <version>3.1.0-1.18</version>
>>        </dependency>
>>        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
>>        <dependency>
>>           <groupId>org.apache.logging.log4j</groupId>
>>           <artifactId>log4j-slf4j-impl</artifactId>
>>           <version>${log4j.version}</version>
>>           <scope>runtime</scope>
>>           <exclusions>
>>              <exclusion>
>>                 <artifactId>slf4j-api</artifactId>
>>                 <groupId>org.slf4j</groupId>
>>              </exclusion>
>>           </exclusions>
>>        </dependency>
>>        <dependency>
>>           <groupId>org.apache.logging.log4j</groupId>
>>           <artifactId>log4j-api</artifactId>
>>           <version>${log4j.version}</version>
>>           <scope>runtime</scope>
>>        </dependency>
>>        <dependency>
>>           <groupId>org.apache.logging.log4j</groupId>
>>           <artifactId>log4j-core</artifactId>
>>           <version>${log4j.version}</version>
>>           <scope>runtime</scope>
>>        </dependency>
>>
>>        <dependency>
>>           <groupId>org.apache.flink</groupId>
>>           <artifactId>flink-azure-fs-hadoop</artifactId>
>>           <version>${flink.version}</version>
>>           <scope>provided</scope>
>>        </dependency>
>>     </dependencies>
>>     <build>
>>        <plugins>
>>           <plugin>
>>              <groupId>org.apache.maven.plugins</groupId>
>>              <artifactId>maven-assembly-plugin</artifactId>
>>              <version>3.0.0</version>
>>              <configuration>
>>                 <appendAssemblyId>false</appendAssemblyId>
>>                 <descriptorRefs>
>>                    <descriptorRef>jar-with-dependencies</descriptorRef>
>>                 </descriptorRefs>
>>              </configuration>
>>              <executions>
>>                 <execution>
>>                    <id>make-assembly</id>
>>                    <phase>package</phase>
>>                    <goals>
>>                       <goal>single</goal>
>>                    </goals>
>>                 </execution>
>>              </executions>
>>           </plugin>
>>        </plugins>
>>     </build>
>> </project>
>>
>>
>> And like my reply in stackoverflow, I found the hadoop-common file :
>> https://github.com/apache/hadoop/blob/release-3.3.4-RC1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java#L3374
>> do not load any filesystem, dig in ServiceLoader.load(FileSystem.class)
>> source code, it looks like have different class loader  make it not load
>> any filesystem.
>> I changed the ServiceLoader.load(FileSystem.class)  to 
>> ServiceLoader.load(FileSystem.class,
>> FileSystem.class.getClassLoader()) and replace the flink-fs-azure-hadoop
>> plugin, it works now,
>> So I'm not sure why it works
>>
>> Gabor Somogyi <gabor.g.somo...@gmail.com> 于2024年6月26日周三 16:52写道:
>>
>>> Hi Xiao,
>>>
>>> I'm not quite convinced that the azure plugin ruined your workload, I
>>> would take a look at the dependency graph you've in the pom.
>>> Adding multiple deps can conflict in terms of class loader services
>>> (src/main/resources/META-INF/services).
>>>
>>> As an example you've 2 such dependencies where
>>> org.apache.flink.core.fs.FileSystemFactory is in the jar.
>>> Hadoop core contains "flie" and the other one something different. Let's
>>> say you don't use service merge plugin in your
>>> maven project. Such case Hadoop core `file` entry will be just
>>> overwritten by the second one.
>>>
>>> Solution: Either avoid deps with conflicting services or add 
>>> ServicesResourceTransformer
>>> to your maven project.
>>>
>>> G
>>>
>>>
>>> On Wed, Jun 26, 2024 at 10:16 AM Xiao Xu <ffxrqy...@gmail.com> wrote:
>>>
>>>> Hi, all
>>>>
>>>> I try to use Flink to write Azure Blob Storage which called ADLS, I put
>>>> the flink-azure-fs-hadoop jar in plugins directory and when I start my
>>>> write job it shows:
>>>>
>>>> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
>>>> FileSystem for scheme "file"
>>>>         at
>>>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262)
>>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.<init>(AbfsOutputStream.java:173)
>>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580)
>>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
>>>>         at
>>>> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301)
>>>> ~[hadoop-azure-3.3.4.5.1.5.3.jar:?]
>>>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>         at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052)
>>>> ~[hadoop-common-3.3.4.5.1.5.3.jar:?]
>>>>
>>>> I search the issue looks like this:
>>>> https://stackoverflow.com/questions/77238642/apache-flink-azure-abfs-file-sink-error-streaming-unsupportedfilesystemexcep
>>>>
>>>> my env:
>>>> Flink: 1.18.1
>>>> JDK: 1.8
>>>>
>>>> Does anyone else have the same problem?
>>>>
>>>

Reply via email to