Hi,

I try to write some user function to write elastic search, my project comes
with elastic search 2.3.5 (which is the same to flink connectors elastic
search). But i'm seeing be exception that recorded in the below JIRA, by
enabling the java -version:class, it show it's the guava dependency issue:
Elastic search 2.3.5 transport client is using guava 18.0, and called one
method starting from guava 18.0, MoreExecutors.directExecutor

While the class loading log show the active runtime MoreExecutors class
loaded from flink distribution, which cause the java.lang.NoSuchMethodError.

Based on above finding, it looks es 2.3.5 is not able to be used with
flink1.2.0 (and then the connectors-elasticsearch is broken)? Can someone
help clarify?

Also, it looks some of the fink-core actually use the shade way to rename
the class like from com.google.guava to org.apache.flink.***.com.google.guava
which is actually a fix of this kind of issue. Etc. https://issues.apache.
org/jira/browse/FLINK-4587/https://issues.apache.org/jira/browse/FLINK-3373
.


My flink cluster is v1.2.0, running in docker.


Thanks,
Ralph

---------- Forwarded message ----------
From: Su Ralph (JIRA) <j...@apache.org>
Date: Mon, Mar 20, 2017 at 6:41 PM
Subject: [jira] [Updated] (FLINK-6126) Yet another conflict : guava
To: suliang...@gmail.com



     [ https://issues.apache.org/jira/browse/FLINK-6126?page=com.
atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Su Ralph updated FLINK-6126:
----------------------------
    Description:
When write a user function try to write to elastic search (depend on
elasticsearch 2.3.5)

Stack like:
java.lang.NoSuchMethodError: com.google.common.util.concurr
ent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
        at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.
java:190)
        at org.elasticsearch.client.transport.TransportClient$Builder.
build(TransportClient.java:131)
        at io.sherlock.capabilities.es.AbstractEsSink.open(AbstractEsSi
nk.java:98)

When enable env.java.opts.taskmanager to -version:class, we can see the
class load log like:
[Loaded com.google.common.util.concurrent.MoreExecutors from
file:/opt/flink/lib/flink-dist_2.11-1.2.0.jar]

The user code is using guva of 18.0.

  was:
For some reason I need to use org.apache.httpcomponents:httpasyncclient:4.1.2
in flink.
The source file is:
{code}
import org.apache.flink.streaming.api.scala._
import org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory

/**
  * Created by renkai on 16/9/7.
  */
object Main {
  def main(args: Array[String]): Unit = {
    val instance = ManagedNHttpClientConnectionFactory.INSTANCE
    println("instance = " + instance)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.fromCollection(1 to 100)
    val result = stream.map { x =>
      x * 2
    }
    result.print()
    env.execute("xixi")
  }
}

{code}

and
{code}
name := "flink-explore"

version := "1.0"

scalaVersion := "2.11.8"

crossPaths := false

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.2-SNAPSHOT"
    exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-connector-kafka-0.8" % "1.2-SNAPSHOT"
    exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-streaming-scala" % "1.2-SNAPSHOT"
    exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.flink" %% "flink-clients" % "1.2-SNAPSHOT"
    exclude("com.google.code.findbugs", "jsr305"),
  "org.apache.httpcomponents" % "httpasyncclient" % "4.1.2"
)
{code}
I use `sbt assembly` to get a fat jar.

If I run the command
{code}
 java -cp flink-explore-assembly-1.0.jar Main
{code}
I get the result

{code}
instance = org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFa
ctory@4909b8da
log4j:WARN No appenders could be found for logger (
org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1177584915]
09/07/2016 12:05:26     Job execution switched to status RUNNING.
09/07/2016 12:05:26     Source: Collection Source(1/1) switched to SCHEDULED
09/07/2016 12:05:26     Source: Collection Source(1/1) switched to DEPLOYING
...
09/07/2016 12:05:26     Map -> Sink: Unnamed(20/24) switched to RUNNING
09/07/2016 12:05:26     Map -> Sink: Unnamed(19/24) switched to RUNNING
15> 30
20> 184
...
19> 182
1> 194
8> 160
09/07/2016 12:05:26     Source: Collection Source(1/1) switched to FINISHED
...
09/07/2016 12:05:26     Map -> Sink: Unnamed(1/24) switched to FINISHED
09/07/2016 12:05:26     Job execution switched to status FINISHED.
{code}

Nothing special.

But if I run the jar by
{code}
./bin/flink run shop-monitor-flink-assembly-1.0.jar
{code}

I will get an error

{code}
$ ./bin/flink run flink-explore-assembly-1.0.jar
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program

------------------------------------------------------------
 The program finished with the following exception:

java.lang.NoSuchFieldError: INSTANCE
        at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFact
ory.<init>(DefaultHttpRequestWriterFactory.java:53)
        at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFact
ory.<init>(DefaultHttpRequestWriterFactory.java:57)
        at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFact
ory.<clinit>(DefaultHttpRequestWriterFactory.java:47)
        at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFa
ctory.<init>(ManagedNHttpClientConnectionFactory.java:75)
        at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFa
ctory.<init>(ManagedNHttpClientConnectionFactory.java:83)
        at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFa
ctory.<clinit>(ManagedNHttpClientConnectionFactory.java:64)
        at Main$.main(Main.scala:9)
        at Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
ssorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
thodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMeth
od(PackagedProgram.java:509)
        at org.apache.flink.client.program.PackagedProgram.invokeIntera
ctiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.ClusterClient.run(ClusterCli
ent.java:322)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFronte
nd.java:774)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:250)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFront
end.java:1002)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1045)
{code}

I tried hard to find the reason of this exception, usually it is caused by
another class with same package and classname but have different content in
the classpath,but I checked every jar in FLINK_HOME/lib, there is no class
named DefaultHttpRequestWriterFactory.

I doubt the jar file is somehow broken by org.apache.flink.runtime.execu
tion.librarycache.BlobLibraryCacheManager, but I don't have any
evidence.Could anyone help?



> Yet another conflict : guava
> ----------------------------
>
>                 Key: FLINK-6126
>                 URL: https://issues.apache.org/jira/browse/FLINK-6126
>             Project: Flink
>          Issue Type: Bug
>          Components: Build System, Local Runtime
>    Affects Versions: 1.2.0
>         Environment: Latest SNAPSHOT
>            Reporter: Su Ralph
>
> When write a user function try to write to elastic search (depend on
elasticsearch 2.3.5)
> Stack like:
> java.lang.NoSuchMethodError: com.google.common.util.concurr
ent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
>         at org.elasticsearch.threadpool.ThreadPool.<clinit>(ThreadPool.
java:190)
>         at org.elasticsearch.client.transport.TransportClient$Builder.
build(TransportClient.java:131)
>         at io.sherlock.capabilities.es.AbstractEsSink.open(AbstractEsSi
nk.java:98)
> When enable env.java.opts.taskmanager to -version:class, we can see the
class load log like:
> [Loaded com.google.common.util.concurrent.MoreExecutors from
file:/opt/flink/lib/flink-dist_2.11-1.2.0.jar]
> The user code is using guva of 18.0.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to