Hi Robert,
Thank you for the hints. I tried to narrow down the error:
Flink version: 0.10-SNAPSHOT
Neo4j version: 2.3.0
I start with two dependencies:
flink-java
flink-gelly
(1) Add neo4j-harness and run basic example from Neo4j [1]
Leads to:
java.lang.ClassNotFoundException: org.eclipse.jetty.server.ConnectionFactory
(2) I excluded jetty-server from flink-java and flink-gelly
It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
Leads to:
leads to: java.lang.NoSuchMethodError:
org.eclipse.jetty.servlet.ServletContextHandler.<init>
(3) I excluded jetty-servlet from flink-java and flink-gelly
It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
Leads to:
java.lang.NoSuchMethodError: scala.Predef$.$conforms()
(4) I excluded scala-library from flink-java and flink-gelly
It now uses scala-library:2.11.7 (was 2.10.4)
Now, the basic Neo4j example (without Flink runs).
Next, I added Flink to the mix and wrote a simple test using
neo4j-harness features, ExecutionEnvironment and my InputFormat.
Leads to:
java.lang.NoSuchMethodError:
scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
at akka.actor.RootActorPath.$div(ActorPath.scala:159)
at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464)
at akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:192)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:231)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
at
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.org$apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.Range.foreach(Range.scala:166)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)
at org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
at
org.apache.flink.api.java.io.neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)
This is where I don't know what to exclude next. Seems that some
components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala
2.11.7.
How can I make use of the maven shade plugin in that case?
Again, thank you!
Cheers,
Martin
[1]
http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
(testMyExtensionWithFunctionFixture())
On 06.11.2015 16:17, Robert Metzger wrote:
Hi Martin,
what exactly were the issues you were facing with the dependency conflicts?
There is a way around these issues, and its called shading:
https://maven.apache.org/plugins/maven-shade-plugin/
In Flink we have several shaded modules (curator, hadoop) .. we could add a
neo4j-harness-shaded module which relocates conflicting dependencies into a
different namespace. That way, you can execute different versions of the
same library (jetty, scala) at the same time.
Since the module contains module would only contain dependencies needed at
test time, we could exclude it from releases.
Regarding Scala, it would be fine to execute the neo4j tests only with
scala 2.11 builds. Its not hard to control this using maven build profiles.
Do you really need jetty? Is neo4j starting the web interface also for the
tests?
Regards,
Robert
On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns <m.jungha...@mailbox.org>
wrote:
Hi,
I could need your input on testing the input format with Flink.
As I already mentioned, Neo4j offers a dedicated module (neo4j-harness)
for unit testing server extensions / REST applications. The problem here
is that the dependencies of Flink conflict with the dependencies of
neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
combination could run using the maven exclude mechanism, but no success.
So I thought about workarounds:
(1) Michael Hunger (neo4j) started a project and invited me to
contribute [1]. What it does during tests is:
- download a neo4j-<version>.tar.gz into a temp folder
- extract and start a neo4j instance
- run tests
- stop and discard neo4j
I like the concept, but I guess the problem is that it runs outside of
maven and I guess downloading from external resources (especially in
travis-ci) could lead to problems.
(2) I had a look into the other input formats. flink-hbase uses examples
instead of unit tests. This could be an option as long as there is no
clean solution for "real" unit testing.
What do you think?
Cheers, Martin
[1] https://github.com/jexp/neo4j-starter
On 03.11.2015 01:18, Stephan Ewen wrote:
Wow, very nice results :-)
This input format alone is probably a very useful contribution, so I
would
open a contribution there once you manage to get a few tests running.
I know little about neo4j, is there a way to read cypher query results in
parallel? (most systems do not expose such an interface, but maybe neo4j
is
special there).
I recall at some point in time Martin Neumann asking about a way to
create
dense contiguous unique IDs for creating graphs that can be bulk-imported
into neo4j. There is code for that in the data set utils, this may be
valuable for an output format.
On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
m.jungha...@mailbox.org>
wrote:
Hi,
I wanted to give you a little update. I created a non-parallel
InputFormat which reads Cypher results from Neo4j into Tuples [1].
It can be used like the JDBCInputFormat:
String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
Neo4jInputFormat.buildNeo4jInputFormat()
.setRestURI(restURI)
.setCypherQuery(q)
.setUsername("neo4j")
.setPassword("test")
.setConnectTimeout(1000)
.setReadTimeout(1000)
.finish();
Atm, to run the tests, a Neo4j instance needs to be up and running.
I tried to get neo4j-harness [2] into the project, but there are some
dependency conflicts which I need to figure out.
I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
running on the same machine. My dataset is the polish wiki dump [2]
which consists of 430,602 pages and 2,727,302 links. The protocol is:
1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
3) Create Gelly graph from Tuple3, init vertex values to 1.0
4) Run PageRank with beta=0.85 and 5 iterations
This takes about 22 seconds on my machine which is very promising.
Next steps are:
- OutputFormat
- Better Unit tests (integrate neo4j-harness)
- Bigger graphs :)
Any ideas and suggestions are of course highly appreciated :)
Best,
Martin
[1] https://github.com/s1ck/flink-neo4j
[2]
http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
[3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
On 29.10.2015 14:51, Vasiliki Kalavri wrote:
Hello everyone,
Martin, Martin, Alex (cc'ed) and myself have started discussing about
implementing a neo4j-Flink connector. I've opened a corresponding JIRA
(FLINK-2941) containing an initial document [1], but we'd also like to
share our ideas here to engage the community and get your feedback.
We've had a skype call today and I will try to summarize some of the
key
points here. The main use-cases we see are the following:
- Use Flink for ETL / creating the graph and then insert it to a graph
database, like neo4j, for querying and search.
- Query neo4j on some topic or search the graph for patterns and
extract a
subgraph, on which we'd then like to run some iterative graph analysis
task. This is where Flink/Gelly can help, by complementing the querying
(neo4j) with efficient iterative computation.
We all agreed that the main challenge is efficiently getting the data
out
of neo4j and into Flink. There have been some attempts to do similar
things
with neo4j and Spark, but the performance results are not very
promising:
- Mazerunner [2] is using HDFS for communication. We think that's it's
not
worth it going towards this direction, as dumping the neo4j database to
HDFS and then reading it back to Flink would probably be terribly slow.
- In [3], you can see Michael Hunger's findings on using neo's HTTP
interface to import data into Spark, run PageRank and then put data
back
into neo4j. It seems that this took > 2h for a 125m edge graph. The
main
bottlenecks appear to be (1) reading the data as an RDD => this had to
be
performed into small batches to avoid OOM errors and (2) PageRank
computation itself, which seems weird to me.
We decided to experiment with neo4j HTTP and Flink and we'll report
back
when we have some results.
In the meantime, if you have any ideas on how we could speed up reading
from neo4j or any suggestion on approaches that I haven't mentioned,
please
feel free to reply to this e-mail or add your comment in the shared
document.
Cheers,
-Vasia.
[1]:
https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
[2]: https://github.com/kbastani/neo4j-mazerunner
[3]: https://gist.github.com/jexp/0dfad34d49a16000e804