I actually think that the logging problem is caused by Hadoop 2.7.3 which pulls in the slf4j-log4j12-1.7.10.jar. This binding is then used but there is no proper configuration file for log4j because Flink actually uses log4j2.
Cheers, Till On Fri, Apr 9, 2021 at 12:05 PM Till Rohrmann <trohrm...@apache.org> wrote: > Hi Yik San, > > to me it looks as if there is a problem with the job and the deployment. > Unfortunately, the logging seems to not have worked. Could you check that > you have a valid log4j.properties file in your conf directory. > > Cheers, > Till > > On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan <evan.chanyik...@gmail.com> > wrote: > >> *The question is cross-posted on Stack >> Overflow >> https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2 >> <https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2>. >> Viewing the question on Stack Overflow is preferred as I include a few >> images for better description.* >> >> Hi community, >> >> ## Flink (Scala) exitCode=2 >> >> I have a simple Flink job that reads from 2 columns of a Hive table >> `mysource`, add up the columns, then writes the result to another Hive >> table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`, >> and `mysink` has only 1 column `c bigint`. >> >> The job submits successfully, however, I observe it keeps retrying. >> >> [![enter image description here][1]][1] >> >> I click into each attempt, they simply show this. >> >> ``` >> AM Container for appattempt_1607399514900_2511_001267 exited with >> exitCode: 2 >> For more detailed output, check application tracking page: >> http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then, >> click on links to logs of each attempt. >> Diagnostics: Exception from container-launch. >> Container id: container_e13_1607399514900_2511_1267_000001 >> Exit code: 2 >> Stack trace: ExitCodeException exitCode=2: >> at org.apache.hadoop.util.Shell.runCommand(Shell.java:582) >> at org.apache.hadoop.util.Shell.run(Shell.java:479) >> at >> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773) >> at >> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212) >> at >> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) >> at >> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Container exited with a non-zero exit code 2 >> Failing this attempt >> ``` >> >> However, the "Logs" has no useful info - it complains about the logging >> lib, but I believe they are really warnings, not errors. >> >> ``` >> LogType:jobmanager.err >> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021 >> LogLength:1010 >> Log Contents: >> SLF4J: Class path contains multiple SLF4J bindings. >> SLF4J: Found binding in >> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/ >> SLF4J: Found binding in >> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: Found binding in >> [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> explanation. >> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] >> log4j:WARN No appenders could be found for logger >> (org.apache.flink.runtime.entrypoint.ClusterEntrypoint). >> log4j:WARN Please initialize the log4j system properly. >> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for >> more info. >> End of LogType:jobmanager.err >> >> LogType:jobmanager.out >> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021 >> LogLength:0 >> Log Contents: >> End of LogType:jobmanager.out >> ``` >> >> This is the job written in Scala. >> >> ```scala >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect} >> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment >> import org.apache.flink.table.catalog.hive.HiveCatalog >> >> object HiveToyExample { >> def main(args: Array[String]): Unit = { >> val settings = EnvironmentSettings.newInstance.build >> val execEnv = StreamExecutionEnvironment.getExecutionEnvironment >> val tableEnv = StreamTableEnvironment.create(execEnv, settings) >> >> val hiveCatalog = new HiveCatalog( >> "myhive", >> "aiinfra", >> "/data/apache/hive/apache-hive-2.1.0-bin/conf/" >> ) >> tableEnv.registerCatalog("myhive", hiveCatalog) >> tableEnv.useCatalog("myhive") >> >> tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) >> >> tableEnv >> .executeSql(""" >> |INSERT INTO mysink >> |SELECT a + b >> |FROM mysource >> |""".stripMargin) >> } >> } >> ``` >> >> Here's the pom.xml. >> >> ```xml >> <project xmlns="http://maven.apache.org/POM/4.0.0" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >> <modelVersion>4.0.0</modelVersion> >> >> <groupId>exmple</groupId> >> <artifactId>featurepipelines</artifactId> >> <version>0.1.1</version> >> <packaging>jar</packaging> >> >> <name>Feature Pipelines</name> >> >> <properties> >> <maven.compiler.source>8</maven.compiler.source> >> <maven.compiler.target>8</maven.compiler.target> >> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> >> <flink.version>1.12.0</flink.version> >> <scala.binary.version>2.11</scala.binary.version> >> <scala.version>2.11.12</scala.version> >> <log4j.version>2.12.1</log4j.version> >> </properties> >> >> <dependencies> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> >> <version>${flink.version}</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.hive</groupId> >> <artifactId>hive-exec</artifactId> >> <version>2.1.0</version> >> <scope>provided</scope> >> </dependency> >> <dependency> >> <groupId>org.slf4j</groupId> >> <artifactId>slf4j-log4j12</artifactId> >> <version>1.7.7</version> >> </dependency> >> </dependencies> >> >> <build> >> <resources> >> <resource> >> <directory>src/main/resources</directory> >> <filtering>true</filtering> >> </resource> >> </resources> >> <plugins> >> <plugin> >> <groupId>org.apache.maven.plugins</groupId> >> <artifactId>maven-shade-plugin</artifactId> >> <version>3.2.4</version> >> <executions> >> <execution> >> <phase>package</phase> >> <goals> >> <goal>shade</goal> >> </goals> >> <configuration> >> <shadedArtifactAttached>false</shadedArtifactAttached> >> <shadedClassifierName>Shade</shadedClassifierName> >> <createDependencyReducedPom>false</createDependencyReducedPom> >> <filters> >> <filter> >> <artifact>*:*</artifact> >> <excludes> >> <exclude>META-INF/*.SF</exclude> >> <exclude>META-INF/*.DSA</exclude> >> <exclude>META-INF/*.RSA</exclude> >> </excludes> >> </filter> >> </filters> >> </configuration> >> </execution> >> </executions> >> </plugin> >> <plugin> >> <groupId>net.alchim31.maven</groupId> >> <artifactId>scala-maven-plugin</artifactId> >> <version>4.4.1</version> >> <executions> >> <execution> >> <goals> >> <goal>compile</goal> >> <goal>testCompile</goal> >> </goals> >> </execution> >> </executions> >> </plugin> >> </plugins> >> </build> >> </project> >> ``` >> >> This is how I package the jar. >> >> ``` >> mvn clean package >> ``` >> >> This is how I run the job. >> >> ``` >> flink run \ >> --yarnname scalaflink-hive-test \ >> -m yarn-cluster \ >> -yarnqueue datadev \ >> --class featurepipelines.ingestion.HiveToyExample \ >> ./featurepipelines-0.1.1.jar >> ``` >> >> ## PyFlink rewrite works just fine?! >> >> Since the logic is so simple, I rewrite the job with PyFlink to see what >> happens. Here shows the PyFlink rewrite. >> >> ```python >> import os >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import * >> from pyflink.table.catalog import HiveCatalog >> >> settings = EnvironmentSettings.new_instance().use_blink_planner().build() >> exec_env = StreamExecutionEnvironment.get_execution_environment() >> t_env = StreamTableEnvironment.create(exec_env, >> environment_settings=settings) >> >> # There exists such a jar in the path >> t_env.get_config().get_configuration().set_string( >> "pipeline.jars", >> f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar" >> ) >> >> catalog_name = "myhive" >> default_database = "aiinfra" >> hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/" >> >> hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir) >> t_env.register_catalog(catalog_name, hive_catalog) >> t_env.use_catalog(catalog_name) >> >> TRANSFORM_DML = """ >> INSERT INTO mysink >> SELECT a + b >> FROM mysource >> """ >> >> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) >> t_env.execute_sql(TRANSFORM_DML).wait() >> ``` >> >> This is how I run the PyFlink job. >> >> ``` >> flink run \ >> --yarnname pyflink-hive-test \ >> -m yarn-cluster \ >> -yD yarn.application.queue=tech_platform \ >> -pyarch pyflink1.12.0.zip \ >> -pyexec /data/software/pyflink1.12.0/bin/python \ >> -py >> /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py >> ``` >> >> Surprisingly, the job runs fine - it finishes soon, with result written >> to the `mysink` table. >> >> ## Why? >> >> Given the comparison, I highly doubt the Scala job fails because it is >> not packaged correctly, even though I follow [Flink Docs]( >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven), >> which can be verified by looking at my pom. >> >> > If you are building your own program, you need the following >> dependencies in your mvn file. It’s recommended not to include these >> dependencies in the resulting jar file. You’re supposed to add dependencies >> as stated above at runtime. >> >> ``` >> <!-- Flink Dependency --> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-hive_2.11</artifactId> >> <version>1.12.0</version> >> <scope>provided</scope> >> </dependency> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-api-java-bridge_2.11</artifactId> >> <version>1.12.0</version> >> <scope>provided</scope> >> </dependency> >> >> <!-- Hive Dependency --> >> <dependency> >> <groupId>org.apache.hive</groupId> >> <artifactId>hive-exec</artifactId> >> <version>${hive.version}</version> >> <scope>provided</scope> >> </dependency> >> ``` >> >> Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in >> /lib of my flink distribution, as suggested in [Flink docs]( >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar >> ): >> >> > the recommended way to add dependency is to use a bundled jar. Separate >> jars should be used only if bundled jars don’t meet your needs. >> >> What do I miss? >> >> Best, >> Yik San >> >> [1]: https://i.stack.imgur.com/fBsHS.png >> [2]: https://i.stack.imgur.com/ilNtr.png >> >