Hi Robert,
Thank you for the reply! At the moment we just "play" with Neo4j and
Flink but the InputFormat shall be available in Flink eventually.
Concerning the license: I did not think of that, but yes, I can make it
available in maven central. I just need to find out how to do this.
I created a branch that includes the dependency problem [1]. There is a
test case "neo4jOnly" [2] which does not use Flink and works fine in a
project where only neo4j-harness is included. However, when I add
flink-java and flink-gelly (excluding flink-clients because of jetty) to
the project, the neo4jOnly test fails with:
org.neo4j.server.ServerStartupException: Starting Neo4j failed:
com.sun.jersey.core.reflection.ReflectionHelper.classForNamePA(Ljava/lang/String;)Ljava/security/PrivilegedAction;
I compared the depedencies of the "clean" neo4j-harness project and made
sure the dependencies and versions are the same. ReflectionHelper is
part of jersey-core which is included.
This is really weird, because - as I wrote before - the simple neo4jOnly
test ran a few days ago. Were there any changes concerning dependencies
in 0.10-SNAPSHOT?
However, the next thing which would fail is caused by the scala-library
version conflict.
Again, thanks for your help.
Best,
Martin
[1] https://github.com/s1ck/flink-neo4j/tree/dependency_problem
[2]
https://github.com/s1ck/flink-neo4j/blob/dependency_problem/src/test/java/org/apache/flink/api/java/io/neo4j/Neo4jInputTest.java#L32
On 12.11.2015 12:51, Robert Metzger wrote:
Sorry for the delay.
So the plan of this work is to add a neo4j connector into Flink, right?
While looking at the pom files of neo4j I found that its GPLv3 licensed,
and Apache projects can not depend/link with GPL code [1].
So I we can not make the module part of the Flink source.
However, its actually quite easy to publish code into Maven central, so you
could release it on your own into Maven.
If that is too much work for you, I can also start a github project like
"flink-gpl" with access to maven central where we can release stuff like
this.
Is this repository [2] your current work in progress on the dependency
issue?
Maybe the neo4j dependency expects scala 2.11 and there is no scala 2.10
build out. In this case, we could require Flink users to use the scala 2.11
build of Flink when they want to use neo4j.
I think I can help you much better as soon as I have your current pom file
+ code.
[1] http://www.apache.org/legal/resolved.html#category-a
[2] https://github.com/s1ck/flink-neo4j
On Wed, Nov 11, 2015 at 7:38 PM, Martin Junghanns <m.jungha...@mailbox.org>
wrote:
Hi,
I am a bit stuck with that dependency problem. Any help would be
appreciated as I would like to continue working on the formats. Thanks!
Best,
Martin
On 07.11.2015 17:28, Martin Junghanns wrote:
Hi Robert,
Thank you for the hints. I tried to narrow down the error:
Flink version: 0.10-SNAPSHOT
Neo4j version: 2.3.0
I start with two dependencies:
flink-java
flink-gelly
(1) Add neo4j-harness and run basic example from Neo4j [1]
Leads to:
java.lang.ClassNotFoundException:
org.eclipse.jetty.server.ConnectionFactory
(2) I excluded jetty-server from flink-java and flink-gelly
It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1)
Leads to:
leads to: java.lang.NoSuchMethodError:
org.eclipse.jetty.servlet.ServletContextHandler.<init>
(3) I excluded jetty-servlet from flink-java and flink-gelly
It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1)
Leads to:
java.lang.NoSuchMethodError: scala.Predef$.$conforms()
(4) I excluded scala-library from flink-java and flink-gelly
It now uses scala-library:2.11.7 (was 2.10.4)
Now, the basic Neo4j example (without Flink runs).
Next, I added Flink to the mix and wrote a simple test using
neo4j-harness features, ExecutionEnvironment and my InputFormat.
Leads to:
java.lang.NoSuchMethodError:
scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
at akka.actor.ActorCell$.<init>(ActorCell.scala:336)
at akka.actor.ActorCell$.<clinit>(ActorCell.scala)
at akka.actor.RootActorPath.$div(ActorPath.scala:159)
at
akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464)
at
akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78)
at scala.util.Try$.apply(Try.scala:192)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:231)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:142)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:119)
at akka.actor.ActorSystem$.create(ActorSystem.scala:67)
at
org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.org
$apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.Range.foreach(Range.scala:166)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235)
at
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226)
at
org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115)
at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821)
at
org.apache.flink.api.java.io
.neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109)
This is where I don't know what to exclude next. Seems that some
components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala
2.11.7.
How can I make use of the maven shade plugin in that case?
Again, thank you!
Cheers,
Martin
[1]
http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
(testMyExtensionWithFunctionFixture())
On 06.11.2015 16:17, Robert Metzger wrote:
Hi Martin,
what exactly were the issues you were facing with the dependency
conflicts?
There is a way around these issues, and its called shading:
https://maven.apache.org/plugins/maven-shade-plugin/
In Flink we have several shaded modules (curator, hadoop) .. we could
add a
neo4j-harness-shaded module which relocates conflicting dependencies
into a
different namespace. That way, you can execute different versions of the
same library (jetty, scala) at the same time.
Since the module contains module would only contain dependencies
needed at
test time, we could exclude it from releases.
Regarding Scala, it would be fine to execute the neo4j tests only with
scala 2.11 builds. Its not hard to control this using maven build
profiles.
Do you really need jetty? Is neo4j starting the web interface also for
the
tests?
Regards,
Robert
On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns
<m.jungha...@mailbox.org>
wrote:
Hi,
I could need your input on testing the input format with Flink.
As I already mentioned, Neo4j offers a dedicated module (neo4j-harness)
for unit testing server extensions / REST applications. The problem here
is that the dependencies of Flink conflict with the dependencies of
neo4j-harness (e.g. jetty, scala-library). I tried to figure out what
combination could run using the maven exclude mechanism, but no success.
So I thought about workarounds:
(1) Michael Hunger (neo4j) started a project and invited me to
contribute [1]. What it does during tests is:
- download a neo4j-<version>.tar.gz into a temp folder
- extract and start a neo4j instance
- run tests
- stop and discard neo4j
I like the concept, but I guess the problem is that it runs outside of
maven and I guess downloading from external resources (especially in
travis-ci) could lead to problems.
(2) I had a look into the other input formats. flink-hbase uses examples
instead of unit tests. This could be an option as long as there is no
clean solution for "real" unit testing.
What do you think?
Cheers, Martin
[1] https://github.com/jexp/neo4j-starter
On 03.11.2015 01:18, Stephan Ewen wrote:
Wow, very nice results :-)
This input format alone is probably a very useful contribution, so I
would
open a contribution there once you manage to get a few tests running.
I know little about neo4j, is there a way to read cypher query
results in
parallel? (most systems do not expose such an interface, but maybe
neo4j
is
special there).
I recall at some point in time Martin Neumann asking about a way to
create
dense contiguous unique IDs for creating graphs that can be
bulk-imported
into neo4j. There is code for that in the data set utils, this may be
valuable for an output format.
On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns <
m.jungha...@mailbox.org>
wrote:
Hi,
I wanted to give you a little update. I created a non-parallel
InputFormat which reads Cypher results from Neo4j into Tuples [1].
It can be used like the JDBCInputFormat:
String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)";
Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput =
Neo4jInputFormat.buildNeo4jInputFormat()
.setRestURI(restURI)
.setCypherQuery(q)
.setUsername("neo4j")
.setPassword("test")
.setConnectTimeout(1000)
.setReadTimeout(1000)
.finish();
Atm, to run the tests, a Neo4j instance needs to be up and running.
I tried to get neo4j-harness [2] into the project, but there are some
dependency conflicts which I need to figure out.
I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j
running on the same machine. My dataset is the polish wiki dump [2]
which consists of 430,602 pages and 2,727,302 links. The protocol is:
1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer>
2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge value
3) Create Gelly graph from Tuple3, init vertex values to 1.0
4) Run PageRank with beta=0.85 and 5 iterations
This takes about 22 seconds on my machine which is very promising.
Next steps are:
- OutputFormat
- Better Unit tests (integrate neo4j-harness)
- Bigger graphs :)
Any ideas and suggestions are of course highly appreciated :)
Best,
Martin
[1] https://github.com/s1ck/flink-neo4j
[2]
http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html
[3] plwiktionary-20151002-pages-articles-multistream.xml.bz2
On 29.10.2015 14:51, Vasiliki Kalavri wrote:
Hello everyone,
Martin, Martin, Alex (cc'ed) and myself have started discussing about
implementing a neo4j-Flink connector. I've opened a corresponding
JIRA
(FLINK-2941) containing an initial document [1], but we'd also
like to
share our ideas here to engage the community and get your feedback.
We've had a skype call today and I will try to summarize some of the
key
points here. The main use-cases we see are the following:
- Use Flink for ETL / creating the graph and then insert it to a
graph
database, like neo4j, for querying and search.
- Query neo4j on some topic or search the graph for patterns and
extract a
subgraph, on which we'd then like to run some iterative graph
analysis
task. This is where Flink/Gelly can help, by complementing the
querying
(neo4j) with efficient iterative computation.
We all agreed that the main challenge is efficiently getting the data
out
of neo4j and into Flink. There have been some attempts to do similar
things
with neo4j and Spark, but the performance results are not very
promising:
- Mazerunner [2] is using HDFS for communication. We think that's
it's
not
worth it going towards this direction, as dumping the neo4j
database to
HDFS and then reading it back to Flink would probably be terribly
slow.
- In [3], you can see Michael Hunger's findings on using neo's HTTP
interface to import data into Spark, run PageRank and then put data
back
into neo4j. It seems that this took > 2h for a 125m edge graph. The
main
bottlenecks appear to be (1) reading the data as an RDD => this
had to
be
performed into small batches to avoid OOM errors and (2) PageRank
computation itself, which seems weird to me.
We decided to experiment with neo4j HTTP and Flink and we'll report
back
when we have some results.
In the meantime, if you have any ideas on how we could speed up
reading
from neo4j or any suggestion on approaches that I haven't mentioned,
please
feel free to reply to this e-mail or add your comment in the shared
document.
Cheers,
-Vasia.
[1]:
https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing
[2]: https://github.com/kbastani/neo4j-mazerunner
[3]: https://gist.github.com/jexp/0dfad34d49a16000e804