[ https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313362#comment-16313362 ]
Jihyun Cho edited comment on FLINK-8318 at 1/5/18 4:14 PM: ----------------------------------------------------------- Here is my code and pom files. {code:title=Test.scala} import java.net.{InetAddress, InetSocketAddress} import java.util.{Properties, TimeZone} import org.apache.commons.lang3.time.FastDateFormat import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import org.json4s.DefaultFormats import org.json4s.JsonAST._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write object Test { val consumeProperties = { val props = new Properties() props.setProperty("bootstrap.servers", "kafka-001:9092") props.setProperty("group.id", "test") props.setProperty("auto.offset.reset", "latest") props } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), consumeProperties)) val config = new java.util.HashMap[String, String] config.put("cluster.name", "test") val transportAddresses = new java.util.ArrayList[InetSocketAddress] transportAddresses.add(new InetSocketAddress(InetAddress.getByName("es-001"), 9300)) val esSink = new ElasticsearchSink[String](config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(t: String): IndexRequest = { return Requests.indexRequest() .index("test") .`type`("message") .source(t) } override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer) = { requestIndexer.add(createIndexRequest(t)) } } ) stream.map { value => try { val esDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS", TimeZone.getTimeZone("UTC")) implicit val formats = DefaultFormats val json = parse(value) val transJson = json transformField { case JField("short_message", JString(s)) => ("message", JString(s)) case JField("host", JString(s)) => ("source", JString(s)) case JField("timestamp", JInt(i)) => ("timestamp", JString(esDateFormat.format((i * 1000L).toLong))) case JField("timestamp", JDouble(d)) => ("timestamp", JString(esDateFormat.format((d * 1000L).toLong))) case JField(k, v) => (k.stripPrefix("_"), v) } write(transJson) } catch { case _: Exception => "" } }.filter(_.nonEmpty).addSink(esSink) env.execute("Test") } } {code} {code:title=pom.xml} <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test.streaming</groupId> <artifactId>test</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.4.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-contrib_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-native_2.11</artifactId> <version>3.5.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>test.streaming.Test</mainClass> </transformer> </transformers> <finalName>${project.artifactId}</finalName> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> {code} was (Author: jihyun cho): Here is my code and pom files. {code:title=Test.scala} import java.net.{InetAddress, InetSocketAddress} import java.util.{Properties, TimeZone} import org.apache.commons.lang3.time.FastDateFormat import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import org.json4s.DefaultFormats import org.json4s.JsonAST._ import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization.write object Test { val consumeProperties = { val props = new Properties() props.setProperty("bootstrap.servers", "kafka-001:9092") props.setProperty("group.id", "test") props.setProperty("auto.offset.reset", "latest") props } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.addSource(new FlinkKafkaConsumer010[String]("clova-log-dev", new SimpleStringSchema(), consumeProperties)) val config = new java.util.HashMap[String, String] config.put("cluster.name", "test") val transportAddresses = new java.util.ArrayList[InetSocketAddress] transportAddresses.add(new InetSocketAddress(InetAddress.getByName("es-001"), 9300)) val esSink = new ElasticsearchSink[String](config, transportAddresses, new ElasticsearchSinkFunction[String] { def createIndexRequest(t: String): IndexRequest = { return Requests.indexRequest() .index("test") .`type`("message") .source(t) } override def process(t: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer) = { requestIndexer.add(createIndexRequest(t)) } } ) stream.map { value => try { val esDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss.SSS", TimeZone.getTimeZone("UTC")) implicit val formats = DefaultFormats val json = parse(value) val transJson = json transformField { case JField("short_message", JString(s)) => ("message", JString(s)) case JField("host", JString(s)) => ("source", JString(s)) case JField("timestamp", JInt(i)) => ("timestamp", JString(esDateFormat.format((i * 1000L).toLong))) case JField("timestamp", JDouble(d)) => ("timestamp", JString(esDateFormat.format((d * 1000L).toLong))) case JField(k, v) => (k.stripPrefix("_"), v) } write(transJson) } catch { case _: Exception => "" } }.filter(_.nonEmpty).addSink(esSink) env.execute("Test") } } {code} {code:title=pom.xml} <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test.streaming</groupId> <artifactId>test</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <properties> <flink.version>1.4.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-contrib_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-native_2.11</artifactId> <version>3.5.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>test.streaming.Test</mainClass> </transformer> </transformers> <finalName>${project.artifactId}</finalName> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project> {code} > Conflict jackson library with ElasticSearch connector > ----------------------------------------------------- > > Key: FLINK-8318 > URL: https://issues.apache.org/jira/browse/FLINK-8318 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector, Startup Shell Scripts > Affects Versions: 1.4.0 > Reporter: Jihyun Cho > Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > My flink job is failed after update flink version to 1.4.0. It uses > ElasticSearch connector. > I'm using CDH Hadoop with Flink option "classloader.resolve-order: > parent-first" > The failure log is below. > {noformat} > Using the result of 'hadoop classpath' to augment the Hadoop classpath: > /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//* > 2017-12-26 14:13:21,160 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -------------------------------------------------------------------------------- > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Starting > TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) > 2017-12-26 14:13:21,161 INFO > org.apache.flink.runtime.taskmanager.TaskManager - OS current > user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Current > Hadoop/Kerberos user: www > 2017-12-26 14:13:21,446 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM: Java > HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11 > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Maximum heap > size: 31403 MiBytes > 2017-12-26 14:13:21,447 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JAVA_HOME: > (not set) > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Hadoop > version: 2.6.5 > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xms32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - -Xmx32768M > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -XX:MaxDirectMemorySize=8388607T > 2017-12-26 14:13:21,448 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/ > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Program > Arguments: > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > --configDir > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - > /home/www/service/flink-1.4.0/conf > 2017-12-26 14:13:21,449 INFO > org.apache.flink.runtime.taskmanager.TaskManager - Classpath: > ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:... > .... > 2017-12-26 14:14:01,393 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Filter -> Map -> Filter -> Sink: > Unnamed (3/10) (fb33a6e0c1a7e859eaef9cf8bcf4565e) switched from RUNNING to > FAILED. > java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW > at > org.elasticsearch.common.xcontent.json.JsonXContent.<clinit>(JsonXContent.java:76) > at > org.elasticsearch.common.xcontent.XContentType$1.xContent(XContentType.java:59) > at > org.elasticsearch.common.settings.Setting.arrayToParsableString(Setting.java:726) > at > org.elasticsearch.common.settings.Setting.lambda$listSetting$26(Setting.java:672) > at > org.elasticsearch.common.settings.Setting$2.getRaw(Setting.java:676) > at > org.elasticsearch.common.settings.Setting.lambda$listSetting$24(Setting.java:660) > at > org.elasticsearch.common.settings.Setting.listSetting(Setting.java:665) > at > org.elasticsearch.common.settings.Setting.listSetting(Setting.java:660) > at > org.elasticsearch.common.network.NetworkService.<clinit>(NetworkService.java:50) > at > org.elasticsearch.client.transport.TransportClient.newPluginService(TransportClient.java:91) > at > org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:119) > at > org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:247) > at > org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:125) > at > org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:111) > at > org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:101) > at > org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:73) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > 2017-12-26 14:14:01,393 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom Source -> Filter -> Map -> Filter -> Sink: > Unnamed (8/10) (e12caa9cc12027738e2426d3a3641bba) switched from RUNNING to > FAILED. > java.lang.NoClassDefFoundError: Could not initialize class > org.elasticsearch.common.network.NetworkService > at > org.elasticsearch.client.transport.TransportClient.newPluginService(TransportClient.java:91) > at > org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:119) > at > org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:247) > at > org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:125) > at > org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:111) > at > org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:101) > at > org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:73) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > {noformat} > The symbol "FAIL_ON_SYMBOL_HASH_OVERFLOW" has been added in 2.4. But CDH > Hadoop is using jackson version 2.2. So there is a conflict between the two > versions. > I reverted changes of https://issues.apache.org/jira/browse/FLINK-7477, and > the problem is disappeared. -- This message was sent by Atlassian JIRA (v6.4.14#64029)