[ 
https://issues.apache.org/jira/browse/FLINK-8318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313362#comment-16313362
 ] 

Jihyun Cho edited comment on FLINK-8318 at 1/5/18 4:14 PM:
-----------------------------------------------------------

Here is my code and pom files.
{code:title=Test.scala}
import java.net.{InetAddress, InetSocketAddress}
import java.util.{Properties, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import 
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, 
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write

object Test {
  val consumeProperties = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", "kafka-001:9092")
    props.setProperty("group.id", "test")
    props.setProperty("auto.offset.reset", "latest")
    props
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.addSource(new FlinkKafkaConsumer010[String]("test", new 
SimpleStringSchema(), consumeProperties))


    val config = new java.util.HashMap[String, String]
    config.put("cluster.name", "test")

    val transportAddresses = new java.util.ArrayList[InetSocketAddress]
    transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("es-001"), 9300))

    val esSink = new ElasticsearchSink[String](config, transportAddresses,
      new ElasticsearchSinkFunction[String] {
        def createIndexRequest(t: String): IndexRequest = {
          return Requests.indexRequest()
            .index("test")
            .`type`("message")
            .source(t)
        }

        override def process(t: String, runtimeContext: RuntimeContext, 
requestIndexer: RequestIndexer) = {
          requestIndexer.add(createIndexRequest(t))
        }
      }
    )

    stream.map { value =>
      try {
        val esDateFormat = FastDateFormat.getInstance("yyyy-MM-dd 
HH:mm:ss.SSS", TimeZone.getTimeZone("UTC"))
        implicit val formats = DefaultFormats
        val json = parse(value)
        val transJson = json transformField {
          case JField("short_message", JString(s)) => ("message", JString(s))
          case JField("host", JString(s)) => ("source", JString(s))
          case JField("timestamp", JInt(i)) => ("timestamp", 
JString(esDateFormat.format((i * 1000L).toLong)))
          case JField("timestamp", JDouble(d)) => ("timestamp", 
JString(esDateFormat.format((d * 1000L).toLong)))
          case JField(k, v) => (k.stripPrefix("_"), v)
        }
        write(transJson)
      } catch {
        case _: Exception => ""
      }
    }.filter(_.nonEmpty).addSink(esSink)

    env.execute("Test")
  }
}
{code}
{code:title=pom.xml}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>

    <modelVersion>4.0.0</modelVersion>

    <groupId>test.streaming</groupId>
    <artifactId>test</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.4.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-contrib_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.json4s</groupId>
            <artifactId>json4s-native_2.11</artifactId>
            <version>3.5.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <configuration>
                    <transformers>
                        <transformer
                                
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        <transformer
                                
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>test.streaming.Test</mainClass>
                        </transformer>
                    </transformers>
                    <finalName>${project.artifactId}</finalName>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
{code}


was (Author: jihyun cho):
Here is my code and pom files.
{code:title=Test.scala}
import java.net.{InetAddress, InetSocketAddress}
import java.util.{Properties, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import 
org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, 
RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.write

object Test {
  val consumeProperties = {
    val props = new Properties()
    props.setProperty("bootstrap.servers", "kafka-001:9092")
    props.setProperty("group.id", "test")
    props.setProperty("auto.offset.reset", "latest")
    props
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val stream = env.addSource(new 
FlinkKafkaConsumer010[String]("clova-log-dev", new SimpleStringSchema(), 
consumeProperties))


    val config = new java.util.HashMap[String, String]
    config.put("cluster.name", "test")

    val transportAddresses = new java.util.ArrayList[InetSocketAddress]
    transportAddresses.add(new 
InetSocketAddress(InetAddress.getByName("es-001"), 9300))

    val esSink = new ElasticsearchSink[String](config, transportAddresses,
      new ElasticsearchSinkFunction[String] {
        def createIndexRequest(t: String): IndexRequest = {
          return Requests.indexRequest()
            .index("test")
            .`type`("message")
            .source(t)
        }

        override def process(t: String, runtimeContext: RuntimeContext, 
requestIndexer: RequestIndexer) = {
          requestIndexer.add(createIndexRequest(t))
        }
      }
    )

    stream.map { value =>
      try {
        val esDateFormat = FastDateFormat.getInstance("yyyy-MM-dd 
HH:mm:ss.SSS", TimeZone.getTimeZone("UTC"))
        implicit val formats = DefaultFormats
        val json = parse(value)
        val transJson = json transformField {
          case JField("short_message", JString(s)) => ("message", JString(s))
          case JField("host", JString(s)) => ("source", JString(s))
          case JField("timestamp", JInt(i)) => ("timestamp", 
JString(esDateFormat.format((i * 1000L).toLong)))
          case JField("timestamp", JDouble(d)) => ("timestamp", 
JString(esDateFormat.format((d * 1000L).toLong)))
          case JField(k, v) => (k.stripPrefix("_"), v)
        }
        write(transJson)
      } catch {
        case _: Exception => ""
      }
    }.filter(_.nonEmpty).addSink(esSink)

    env.execute("Test")
  }
}
{code}
{code:title=pom.xml}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0";
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>

    <modelVersion>4.0.0</modelVersion>

    <groupId>test.streaming</groupId>
    <artifactId>test</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.4.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-contrib_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.json4s</groupId>
            <artifactId>json4s-native_2.11</artifactId>
            <version>3.5.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <configuration>
                    <transformers>
                        <transformer
                                
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        <transformer
                                
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>test.streaming.Test</mainClass>
                        </transformer>
                    </transformers>
                    <finalName>${project.artifactId}</finalName>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
{code}

> Conflict jackson library with ElasticSearch connector
> -----------------------------------------------------
>
>                 Key: FLINK-8318
>                 URL: https://issues.apache.org/jira/browse/FLINK-8318
>             Project: Flink
>          Issue Type: Bug
>          Components: ElasticSearch Connector, Startup Shell Scripts
>    Affects Versions: 1.4.0
>            Reporter: Jihyun Cho
>            Priority: Blocker
>             Fix For: 1.5.0, 1.4.1
>
>
> My flink job is failed after update flink version to 1.4.0. It uses 
> ElasticSearch connector.
> I'm using CDH Hadoop with Flink option "classloader.resolve-order: 
> parent-first" 
> The failure log is below.
> {noformat}
> Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
> /etc/hadoop/conf:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/./:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-yarn/.//*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/lib/*:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//*
> 2017-12-26 14:13:21,160 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              - 
> --------------------------------------------------------------------------------
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Starting 
> TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC)
> 2017-12-26 14:13:21,161 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  OS current 
> user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Current 
> Hadoop/Kerberos user: www
> 2017-12-26 14:13:21,446 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  JVM: Java 
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Maximum heap 
> size: 31403 MiBytes
> 2017-12-26 14:13:21,447 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  JAVA_HOME: 
> (not set)
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Hadoop 
> version: 2.6.5
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  JVM Options:
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     -Xms32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     -Xmx32768M
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> -XX:MaxDirectMemorySize=8388607T
> 2017-12-26 14:13:21,448 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> -Djava.library.path=/home/cloudera/parcels/CDH/lib/hadoop/lib/native/
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> -Dlog4j.configuration=file:/home/www/service/flink-1.4.0/conf/log4j-console.properties
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> -Dlogback.configurationFile=file:/home/www/service/flink-1.4.0/conf/logback-console.xml
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Program 
> Arguments:
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> --configDir
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -     
> /home/www/service/flink-1.4.0/conf
> 2017-12-26 14:13:21,449 INFO  
> org.apache.flink.runtime.taskmanager.TaskManager              -  Classpath:
> ...:/home/cloudera/parcels/CDH-5.12.1-1.cdh5.12.1.p0.3/lib/hadoop/libexec/../../hadoop-mapreduce/.//jackson-core-2.2.3.jar:...
> ....
> 2017-12-26 14:14:01,393 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Source: Custom Source -> Filter -> Map -> Filter -> Sink: 
> Unnamed (3/10) (fb33a6e0c1a7e859eaef9cf8bcf4565e) switched from RUNNING to 
> FAILED.
> java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
>         at 
> org.elasticsearch.common.xcontent.json.JsonXContent.<clinit>(JsonXContent.java:76)
>         at 
> org.elasticsearch.common.xcontent.XContentType$1.xContent(XContentType.java:59)
>         at 
> org.elasticsearch.common.settings.Setting.arrayToParsableString(Setting.java:726)
>         at 
> org.elasticsearch.common.settings.Setting.lambda$listSetting$26(Setting.java:672)
>         at 
> org.elasticsearch.common.settings.Setting$2.getRaw(Setting.java:676)
>         at 
> org.elasticsearch.common.settings.Setting.lambda$listSetting$24(Setting.java:660)
>         at 
> org.elasticsearch.common.settings.Setting.listSetting(Setting.java:665)
>         at 
> org.elasticsearch.common.settings.Setting.listSetting(Setting.java:660)
>         at 
> org.elasticsearch.common.network.NetworkService.<clinit>(NetworkService.java:50)
>         at 
> org.elasticsearch.client.transport.TransportClient.newPluginService(TransportClient.java:91)
>         at 
> org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:119)
>         at 
> org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:247)
>         at 
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:125)
>         at 
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:111)
>         at 
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:101)
>         at 
> org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:73)
>         at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281)
>         at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>         at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>         at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> 2017-12-26 14:14:01,393 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Source: Custom Source -> Filter -> Map -> Filter -> Sink: 
> Unnamed (8/10) (e12caa9cc12027738e2426d3a3641bba) switched from RUNNING to 
> FAILED.
> java.lang.NoClassDefFoundError: Could not initialize class 
> org.elasticsearch.common.network.NetworkService
>         at 
> org.elasticsearch.client.transport.TransportClient.newPluginService(TransportClient.java:91)
>         at 
> org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:119)
>         at 
> org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:247)
>         at 
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:125)
>         at 
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:111)
>         at 
> org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:101)
>         at 
> org.apache.flink.streaming.connectors.elasticsearch5.Elasticsearch5ApiCallBridge.createClient(Elasticsearch5ApiCallBridge.java:73)
>         at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:281)
>         at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>         at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>         at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The symbol "FAIL_ON_SYMBOL_HASH_OVERFLOW" has been added in 2.4. But CDH 
> Hadoop is using jackson version 2.2. So there is a conflict between the two 
> versions.
> I reverted changes of https://issues.apache.org/jira/browse/FLINK-7477, and 
> the problem is disappeared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to