I actually think that the logging problem is caused by Hadoop 2.7.3 which
pulls in the slf4j-log4j12-1.7.10.jar. This binding is then used but there
is no proper configuration file for log4j because Flink actually uses
log4j2.

Cheers,
Till

On Fri, Apr 9, 2021 at 12:05 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Yik San,
>
> to me it looks as if there is a problem with the job and the deployment.
> Unfortunately, the logging seems to not have worked. Could you check that
> you have a valid log4j.properties file in your conf directory.
>
> Cheers,
> Till
>
> On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan <evan.chanyik...@gmail.com>
> wrote:
>
>> *The question is cross-posted on Stack
>> Overflow 
>> https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2
>> <https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2>.
>> Viewing the question on Stack Overflow is preferred as I include a few
>> images for better description.*
>>
>> Hi community,
>>
>> ## Flink (Scala) exitCode=2
>>
>> I have a simple Flink job that reads from 2 columns of a Hive table
>> `mysource`, add up the columns, then writes the result to another Hive
>> table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`,
>> and `mysink` has only 1 column `c bigint`.
>>
>> The job submits successfully, however, I observe it keeps retrying.
>>
>> [![enter image description here][1]][1]
>>
>> I click into each attempt, they simply show this.
>>
>> ```
>> AM Container for appattempt_1607399514900_2511_001267 exited with
>> exitCode: 2
>> For more detailed output, check application tracking page:
>> http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then,
>> click on links to logs of each attempt.
>> Diagnostics: Exception from container-launch.
>> Container id: container_e13_1607399514900_2511_1267_000001
>> Exit code: 2
>> Stack trace: ExitCodeException exitCode=2:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
>> at org.apache.hadoop.util.Shell.run(Shell.java:479)
>> at
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Container exited with a non-zero exit code 2
>> Failing this attempt
>> ```
>>
>> However, the "Logs" has no useful info - it complains about the logging
>> lib, but I believe they are really warnings, not errors.
>>
>> ```
>> LogType:jobmanager.err
>> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
>> LogLength:1010
>> Log Contents:
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> log4j:WARN No appenders could be found for logger
>> (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
>> log4j:WARN Please initialize the log4j system properly.
>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
>> more info.
>> End of LogType:jobmanager.err
>>
>> LogType:jobmanager.out
>> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
>> LogLength:0
>> Log Contents:
>> End of LogType:jobmanager.out
>> ```
>>
>> This is the job written in Scala.
>>
>> ```scala
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
>> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>> import org.apache.flink.table.catalog.hive.HiveCatalog
>>
>> object HiveToyExample {
>>   def main(args: Array[String]): Unit = {
>>     val settings = EnvironmentSettings.newInstance.build
>>     val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>     val tableEnv = StreamTableEnvironment.create(execEnv, settings)
>>
>>     val hiveCatalog = new HiveCatalog(
>>       "myhive",
>>       "aiinfra",
>>       "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
>>     )
>>     tableEnv.registerCatalog("myhive", hiveCatalog)
>>     tableEnv.useCatalog("myhive")
>>
>>     tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>>
>>     tableEnv
>>       .executeSql("""
>>           |INSERT INTO mysink
>>           |SELECT a + b
>>           |FROM mysource
>>           |""".stripMargin)
>>   }
>> }
>> ```
>>
>> Here's the pom.xml.
>>
>> ```xml
>> <project xmlns="http://maven.apache.org/POM/4.0.0";
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>> <modelVersion>4.0.0</modelVersion>
>>
>> <groupId>exmple</groupId>
>> <artifactId>featurepipelines</artifactId>
>> <version>0.1.1</version>
>> <packaging>jar</packaging>
>>
>> <name>Feature Pipelines</name>
>>
>> <properties>
>> <maven.compiler.source>8</maven.compiler.source>
>> <maven.compiler.target>8</maven.compiler.target>
>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>> <flink.version>1.12.0</flink.version>
>> <scala.binary.version>2.11</scala.binary.version>
>> <scala.version>2.11.12</scala.version>
>> <log4j.version>2.12.1</log4j.version>
>> </properties>
>>
>> <dependencies>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-clients_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.hive</groupId>
>> <artifactId>hive-exec</artifactId>
>> <version>2.1.0</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.slf4j</groupId>
>> <artifactId>slf4j-log4j12</artifactId>
>> <version>1.7.7</version>
>> </dependency>
>> </dependencies>
>>
>> <build>
>> <resources>
>> <resource>
>> <directory>src/main/resources</directory>
>> <filtering>true</filtering>
>> </resource>
>> </resources>
>> <plugins>
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-shade-plugin</artifactId>
>> <version>3.2.4</version>
>> <executions>
>> <execution>
>> <phase>package</phase>
>> <goals>
>> <goal>shade</goal>
>> </goals>
>> <configuration>
>> <shadedArtifactAttached>false</shadedArtifactAttached>
>> <shadedClassifierName>Shade</shadedClassifierName>
>> <createDependencyReducedPom>false</createDependencyReducedPom>
>> <filters>
>> <filter>
>> <artifact>*:*</artifact>
>> <excludes>
>> <exclude>META-INF/*.SF</exclude>
>> <exclude>META-INF/*.DSA</exclude>
>> <exclude>META-INF/*.RSA</exclude>
>> </excludes>
>> </filter>
>> </filters>
>> </configuration>
>> </execution>
>> </executions>
>> </plugin>
>> <plugin>
>> <groupId>net.alchim31.maven</groupId>
>> <artifactId>scala-maven-plugin</artifactId>
>> <version>4.4.1</version>
>> <executions>
>> <execution>
>> <goals>
>> <goal>compile</goal>
>> <goal>testCompile</goal>
>> </goals>
>> </execution>
>> </executions>
>> </plugin>
>> </plugins>
>> </build>
>> </project>
>> ```
>>
>> This is how I package the jar.
>>
>> ```
>> mvn clean package
>> ```
>>
>> This is how I run the job.
>>
>> ```
>> flink run \
>> --yarnname scalaflink-hive-test \
>> -m yarn-cluster \
>> -yarnqueue datadev \
>> --class featurepipelines.ingestion.HiveToyExample \
>> ./featurepipelines-0.1.1.jar
>> ```
>>
>> ## PyFlink rewrite works just fine?!
>>
>> Since the logic is so simple, I rewrite the job with PyFlink to see what
>> happens. Here shows the PyFlink rewrite.
>>
>> ```python
>> import os
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import *
>> from pyflink.table.catalog import HiveCatalog
>>
>> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> t_env = StreamTableEnvironment.create(exec_env,
>> environment_settings=settings)
>>
>> # There exists such a jar in the path
>> t_env.get_config().get_configuration().set_string(
>>     "pipeline.jars",
>> f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
>> )
>>
>> catalog_name = "myhive"
>> default_database = "aiinfra"
>> hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
>>
>> hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
>> t_env.register_catalog(catalog_name, hive_catalog)
>> t_env.use_catalog(catalog_name)
>>
>> TRANSFORM_DML = """
>> INSERT INTO mysink
>> SELECT a + b
>> FROM mysource
>> """
>>
>> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
>> t_env.execute_sql(TRANSFORM_DML).wait()
>> ```
>>
>> This is how I run the PyFlink job.
>>
>> ```
>> flink run \
>> --yarnname pyflink-hive-test \
>> -m yarn-cluster \
>> -yD yarn.application.queue=tech_platform \
>> -pyarch pyflink1.12.0.zip \
>> -pyexec /data/software/pyflink1.12.0/bin/python \
>> -py
>> /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py
>> ```
>>
>> Surprisingly, the job runs fine - it finishes soon, with result written
>> to the `mysink` table.
>>
>> ## Why?
>>
>> Given the comparison, I highly doubt the Scala job fails because it is
>> not packaged correctly, even though I follow [Flink Docs](
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven),
>> which can be verified by looking at my pom.
>>
>> > If you are building your own program, you need the following
>> dependencies in your mvn file. It’s recommended not to include these
>> dependencies in the resulting jar file. You’re supposed to add dependencies
>> as stated above at runtime.
>>
>> ```
>> <!-- Flink Dependency -->
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-connector-hive_2.11</artifactId>
>>   <version>1.12.0</version>
>>   <scope>provided</scope>
>> </dependency>
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>   <version>1.12.0</version>
>>   <scope>provided</scope>
>> </dependency>
>>
>> <!-- Hive Dependency -->
>> <dependency>
>>     <groupId>org.apache.hive</groupId>
>>     <artifactId>hive-exec</artifactId>
>>     <version>${hive.version}</version>
>>     <scope>provided</scope>
>> </dependency>
>> ```
>>
>> Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in
>> /lib of my flink distribution, as suggested in [Flink docs](
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
>> ):
>>
>> > the recommended way to add dependency is to use a bundled jar. Separate
>> jars should be used only if bundled jars don’t meet your needs.
>>
>> What do I miss?
>>
>> Best,
>> Yik San
>>
>>   [1]: https://i.stack.imgur.com/fBsHS.png
>>   [2]: https://i.stack.imgur.com/ilNtr.png
>>
>

Reply via email to