Hey Fabi,

many thanks for your clarifications! It seems flink-shaded-hadoop2
itself is already included in the binary distribution:

> $ jar tf flink-1.2.0/lib/flink-dist_2.10-1.2.0.jar | grep org/apache/hadoop | 
> head -n3
> org/apache/hadoop/
> org/apache/hadoop/fs/
> org/apache/hadoop/fs/FileSystem$Statistics$StatisticsAggregator.class

That's why adding just the hadoop-compatibility jar fixed the problem
for me. I'm not at all into how flink handles class loading yet, but at
the first look into `TypeExtractor` I was surprised to see it _not_
using  the thread's current context class loader [1] (with a fallback to
its own classloader). This has led me to investigating the jars'
contents and find the problem. I'll set up a jira ticket for this issue
on Monday.

Have a nice weekend,
P.

[1]
> http://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader



On 04/07/2017 09:24 PM, Fabian Hueske wrote:
> Hi Petr,
> 
> I think that's an expected behavior because the exception is intercepted
> and enriched with an instruction to solve the problem.
> As you assumed, you need to add the flink-hadoop-compatibility JAR file
> to the ./lib folder. Unfortunately, the file is not included in the
> binary distribution.
> You can either build it from source or manually download it from a
> public Maven repository. You might need to add the flink-shaded-hadoop2
> jar file as well, which is a dependency of flink-hadoop-compatibility.
> 
> I think we should make that easier for users and add a pre-built jar
> file to the ./opt folder of the binary distribution.
> Would you mind to open a JIRA for this?
> 
> Now a bit of background why we moved the TypeInfo to
> flink-hadoop-compatibility. We are preparing Flink's core to become
> independent of Hadoop, i.e., Flink core should not require Hadoop. We
> will of course keep the option to run Flink on YARN and write data to
> HDFS, but this should be optional and not baked into the core.
> 
> Best, Fabian
> 
> 
> 
> 2017-04-07 16:27 GMT+02:00 Petr Novotnik <petr.novot...@firma.seznam.cz
> <mailto:petr.novot...@firma.seznam.cz>>:
> 
>     Hello,
> 
>     with 1.2.0 `WritableTypeInfo` got moved into its own artifact
>     (flink-hadoop-compatibility_2.10-1.2.0.jar). Unlike with 1.1.0, the
>     distribution jar `flink-dist_2.10-1.2.0.jar` does not include the hadoop
>     compatibility classes anymore. However, `TypeExtractor` which is part of
>     the distribution jar tries to load `WritableTypeInfo` using it was
>     loaded itself from:
> 
>     >               Class<?> typeInfoClass;
>     >               try {
>     >                       typeInfoClass =
>     Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false,
>     TypeExtractor.class.getClassLoader());
>     >               }
>     >               catch (ClassNotFoundException e) {
>     >                       throw new RuntimeException("Could not load
>     the TypeInformation for the class '"
>     >                                       + HADOOP_WRITABLE_CLASS +
>     "'. You may be missing the 'flink-hadoop-compatibility' dependency.");
>     >               }
> 
>     Adding `flink-hadoop-compatibility` to my application jar leads to the
>     following stack trace on yarn (running `bin/flink run -m
>     yarn-cluster...`):
> 
>     > Caused by: java.lang.RuntimeException: Could not load the
>     TypeInformation for the class 'org.apache.hadoop.io
>     <http://org.apache.hadoop.io>.Writable'. You may be missing the
>     'flink-hadoop-compatibility' dependency.
>     >       at
>     
> org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025)
>     >       at
>     
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649)
>     >       at
>     
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591)
>     >       at
>     
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778)
>     >       at
>     
> org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:998)
>     >       at
>     
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679)
>     >       at
>     
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629)
>     >       at
>     
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595)
>     >       at
>     
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588)
>     >       at
>     org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:47)
>     >       at
>     cz.seznam.euphoria.benchmarks.flink.Util$2.<init>(Util.java:80)
> 
>     I guess I'm supposed to customize my flink installation by adding the
>     hadoop-compatibility jar to flink's `lib` dir, correct? If so, is this
>     documented? I couldn't find any hints on [1] nor [2] and, thus, suppose
>     this is maybe an unintentional change between 1.1 and 1.2.
> 
>     [1]
>     
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/hadoop_compatibility.html
>     
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/hadoop_compatibility.html>
>     [2]
>     
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/migration.html
>     
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/migration.html>
> 
>     P.
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to