Hey Fabi, many thanks for your clarifications! It seems flink-shaded-hadoop2 itself is already included in the binary distribution:
> $ jar tf flink-1.2.0/lib/flink-dist_2.10-1.2.0.jar | grep org/apache/hadoop | > head -n3 > org/apache/hadoop/ > org/apache/hadoop/fs/ > org/apache/hadoop/fs/FileSystem$Statistics$StatisticsAggregator.class That's why adding just the hadoop-compatibility jar fixed the problem for me. I'm not at all into how flink handles class loading yet, but at the first look into `TypeExtractor` I was surprised to see it _not_ using the thread's current context class loader [1] (with a fallback to its own classloader). This has led me to investigating the jars' contents and find the problem. I'll set up a jira ticket for this issue on Monday. Have a nice weekend, P. [1] > http://stackoverflow.com/questions/1771679/difference-between-threads-context-class-loader-and-normal-classloader On 04/07/2017 09:24 PM, Fabian Hueske wrote: > Hi Petr, > > I think that's an expected behavior because the exception is intercepted > and enriched with an instruction to solve the problem. > As you assumed, you need to add the flink-hadoop-compatibility JAR file > to the ./lib folder. Unfortunately, the file is not included in the > binary distribution. > You can either build it from source or manually download it from a > public Maven repository. You might need to add the flink-shaded-hadoop2 > jar file as well, which is a dependency of flink-hadoop-compatibility. > > I think we should make that easier for users and add a pre-built jar > file to the ./opt folder of the binary distribution. > Would you mind to open a JIRA for this? > > Now a bit of background why we moved the TypeInfo to > flink-hadoop-compatibility. We are preparing Flink's core to become > independent of Hadoop, i.e., Flink core should not require Hadoop. We > will of course keep the option to run Flink on YARN and write data to > HDFS, but this should be optional and not baked into the core. > > Best, Fabian > > > > 2017-04-07 16:27 GMT+02:00 Petr Novotnik <petr.novot...@firma.seznam.cz > <mailto:petr.novot...@firma.seznam.cz>>: > > Hello, > > with 1.2.0 `WritableTypeInfo` got moved into its own artifact > (flink-hadoop-compatibility_2.10-1.2.0.jar). Unlike with 1.1.0, the > distribution jar `flink-dist_2.10-1.2.0.jar` does not include the hadoop > compatibility classes anymore. However, `TypeExtractor` which is part of > the distribution jar tries to load `WritableTypeInfo` using it was > loaded itself from: > > > Class<?> typeInfoClass; > > try { > > typeInfoClass = > Class.forName(HADOOP_WRITABLE_TYPEINFO_CLASS, false, > TypeExtractor.class.getClassLoader()); > > } > > catch (ClassNotFoundException e) { > > throw new RuntimeException("Could not load > the TypeInformation for the class '" > > + HADOOP_WRITABLE_CLASS + > "'. You may be missing the 'flink-hadoop-compatibility' dependency."); > > } > > Adding `flink-hadoop-compatibility` to my application jar leads to the > following stack trace on yarn (running `bin/flink run -m > yarn-cluster...`): > > > Caused by: java.lang.RuntimeException: Could not load the > TypeInformation for the class 'org.apache.hadoop.io > <http://org.apache.hadoop.io>.Writable'. You may be missing the > 'flink-hadoop-compatibility' dependency. > > at > > org.apache.flink.api.java.typeutils.TypeExtractor.createHadoopWritableTypeInfo(TypeExtractor.java:2025) > > at > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1649) > > at > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1591) > > at > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:778) > > at > > org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:998) > > at > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:679) > > at > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:629) > > at > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:595) > > at > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:588) > > at > org.apache.flink.api.common.typeinfo.TypeHint.<init>(TypeHint.java:47) > > at > cz.seznam.euphoria.benchmarks.flink.Util$2.<init>(Util.java:80) > > I guess I'm supposed to customize my flink installation by adding the > hadoop-compatibility jar to flink's `lib` dir, correct? If so, is this > documented? I couldn't find any hints on [1] nor [2] and, thus, suppose > this is maybe an unintentional change between 1.1 and 1.2. > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/hadoop_compatibility.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/hadoop_compatibility.html> > [2] > > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/migration.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/migration.html> > > P. > > >
signature.asc
Description: OpenPGP digital signature