Have a look at my updated version of your code: https://github.com/rmetzger/scratch/tree/dependency_problem It now executes both tests, however, I was not able to get the second test to pass. It seems that Neo4j's web server is returning a 500 status code when open()'ing the connection. I'm not sure how to debug this issue.
On Thu, Nov 12, 2015 at 2:19 PM, Martin Junghanns <m.jungha...@mailbox.org> wrote: > Hi Robert, > > Thank you for the reply! At the moment we just "play" with Neo4j and Flink > but the InputFormat shall be available in Flink eventually. > > Concerning the license: I did not think of that, but yes, I can make it > available in maven central. I just need to find out how to do this. > > I created a branch that includes the dependency problem [1]. There is a > test case "neo4jOnly" [2] which does not use Flink and works fine in a > project where only neo4j-harness is included. However, when I add > flink-java and flink-gelly (excluding flink-clients because of jetty) to > the project, the neo4jOnly test fails with: > > org.neo4j.server.ServerStartupException: Starting Neo4j failed: > com.sun.jersey.core.reflection.ReflectionHelper.classForNamePA(Ljava/lang/String;)Ljava/security/PrivilegedAction; > > I compared the depedencies of the "clean" neo4j-harness project and made > sure the dependencies and versions are the same. ReflectionHelper is part > of jersey-core which is included. > > This is really weird, because - as I wrote before - the simple neo4jOnly > test ran a few days ago. Were there any changes concerning dependencies in > 0.10-SNAPSHOT? > However, the next thing which would fail is caused by the scala-library > version conflict. > > Again, thanks for your help. > > Best, > Martin > > [1] https://github.com/s1ck/flink-neo4j/tree/dependency_problem > [2] > https://github.com/s1ck/flink-neo4j/blob/dependency_problem/src/test/java/org/apache/flink/api/java/io/neo4j/Neo4jInputTest.java#L32 > > > On 12.11.2015 12:51, Robert Metzger wrote: > >> Sorry for the delay. >> So the plan of this work is to add a neo4j connector into Flink, right? >> >> While looking at the pom files of neo4j I found that its GPLv3 licensed, >> and Apache projects can not depend/link with GPL code [1]. >> So I we can not make the module part of the Flink source. >> However, its actually quite easy to publish code into Maven central, so >> you >> could release it on your own into Maven. >> If that is too much work for you, I can also start a github project like >> "flink-gpl" with access to maven central where we can release stuff like >> this. >> >> Is this repository [2] your current work in progress on the dependency >> issue? >> Maybe the neo4j dependency expects scala 2.11 and there is no scala 2.10 >> build out. In this case, we could require Flink users to use the scala >> 2.11 >> build of Flink when they want to use neo4j. >> I think I can help you much better as soon as I have your current pom file >> + code. >> >> [1] http://www.apache.org/legal/resolved.html#category-a >> [2] https://github.com/s1ck/flink-neo4j >> >> >> On Wed, Nov 11, 2015 at 7:38 PM, Martin Junghanns < >> m.jungha...@mailbox.org> >> wrote: >> >> Hi, >>> >>> I am a bit stuck with that dependency problem. Any help would be >>> appreciated as I would like to continue working on the formats. Thanks! >>> >>> Best, >>> Martin >>> >>> >>> On 07.11.2015 17:28, Martin Junghanns wrote: >>> >>> Hi Robert, >>>> >>>> Thank you for the hints. I tried to narrow down the error: >>>> >>>> Flink version: 0.10-SNAPSHOT >>>> Neo4j version: 2.3.0 >>>> >>>> I start with two dependencies: >>>> flink-java >>>> flink-gelly >>>> >>>> (1) Add neo4j-harness and run basic example from Neo4j [1] >>>> Leads to: >>>> >>>> java.lang.ClassNotFoundException: >>>> org.eclipse.jetty.server.ConnectionFactory >>>> >>>> (2) I excluded jetty-server from flink-java and flink-gelly >>>> It now uses jetty-server:9.2.4.v20141103 (was 8.0.0.M1) >>>> Leads to: >>>> >>>> leads to: java.lang.NoSuchMethodError: >>>> org.eclipse.jetty.servlet.ServletContextHandler.<init> >>>> >>>> (3) I excluded jetty-servlet from flink-java and flink-gelly >>>> It now uses jetty-servlet:9.2.4.v20141103 (was 8.0.0.M1) >>>> Leads to: >>>> >>>> java.lang.NoSuchMethodError: scala.Predef$.$conforms() >>>> >>>> (4) I excluded scala-library from flink-java and flink-gelly >>>> It now uses scala-library:2.11.7 (was 2.10.4) >>>> >>>> Now, the basic Neo4j example (without Flink runs). >>>> >>>> Next, I added Flink to the mix and wrote a simple test using >>>> neo4j-harness features, ExecutionEnvironment and my InputFormat. >>>> Leads to: >>>> >>>> java.lang.NoSuchMethodError: >>>> >>>> >>>> scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet; >>>> >>>> at akka.actor.ActorCell$.<init>(ActorCell.scala:336) >>>> at akka.actor.ActorCell$.<clinit>(ActorCell.scala) >>>> at akka.actor.RootActorPath.$div(ActorPath.scala:159) >>>> at >>>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:464) >>>> at >>>> akka.actor.LocalActorRefProvider.<init>(ActorRefProvider.scala:452) >>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>> Method) >>>> at >>>> >>>> >>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >>>> >>>> at >>>> >>>> >>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>> >>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:422) >>>> at >>>> >>>> >>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:78) >>>> >>>> at scala.util.Try$.apply(Try.scala:192) >>>> at >>>> >>>> >>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) >>>> >>>> at >>>> >>>> >>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) >>>> >>>> at >>>> >>>> >>>> akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) >>>> >>>> at scala.util.Success.flatMap(Try.scala:231) >>>> at >>>> >>>> >>>> akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) >>>> >>>> at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585) >>>> at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578) >>>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:142) >>>> at akka.actor.ActorSystem$.apply(ActorSystem.scala:119) >>>> at akka.actor.ActorSystem$.create(ActorSystem.scala:67) >>>> at >>>> >>>> >>>> org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:84) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:203) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:232) >>>> >>>> at >>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.org >>>> >>>> $apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:232) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:237) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:235) >>>> >>>> at >>>> >>>> >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) >>>> >>>> at >>>> >>>> >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) >>>> >>>> at scala.collection.immutable.Range.foreach(Range.scala:166) >>>> at >>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:245) >>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>> at >>>> >>>> >>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:235) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:226) >>>> >>>> at >>>> org.apache.flink.client.LocalExecutor.start(LocalExecutor.java:115) >>>> at >>>> >>>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173) >>>> at >>>> >>>> >>>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:87) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:821) >>>> >>>> at >>>> org.apache.flink.api.java.io >>>> .neo4j.Neo4jInputTest.inputFormatTest(Neo4jInputTest.java:109) >>>> >>>> >>>> This is where I don't know what to exclude next. Seems that some >>>> components (akka?) need scala 2.10.4 and Neo4j (cypher) depends on scala >>>> 2.11.7. >>>> >>>> How can I make use of the maven shade plugin in that case? >>>> >>>> Again, thank you! >>>> >>>> Cheers, >>>> Martin >>>> >>>> [1] >>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html >>>> (testMyExtensionWithFunctionFixture()) >>>> >>>> >>>> On 06.11.2015 16:17, Robert Metzger wrote: >>>> >>>> Hi Martin, >>>>> >>>>> what exactly were the issues you were facing with the dependency >>>>> conflicts? >>>>> >>>>> There is a way around these issues, and its called shading: >>>>> https://maven.apache.org/plugins/maven-shade-plugin/ >>>>> In Flink we have several shaded modules (curator, hadoop) .. we could >>>>> add a >>>>> neo4j-harness-shaded module which relocates conflicting dependencies >>>>> into a >>>>> different namespace. That way, you can execute different versions of >>>>> the >>>>> same library (jetty, scala) at the same time. >>>>> Since the module contains module would only contain dependencies >>>>> needed at >>>>> test time, we could exclude it from releases. >>>>> >>>>> Regarding Scala, it would be fine to execute the neo4j tests only with >>>>> scala 2.11 builds. Its not hard to control this using maven build >>>>> profiles. >>>>> Do you really need jetty? Is neo4j starting the web interface also for >>>>> the >>>>> tests? >>>>> >>>>> Regards, >>>>> Robert >>>>> >>>>> >>>>> On Fri, Nov 6, 2015 at 4:09 PM, Martin Junghanns >>>>> <m.jungha...@mailbox.org> >>>>> wrote: >>>>> >>>>> Hi, >>>>> >>>>>> >>>>>> I could need your input on testing the input format with Flink. >>>>>> >>>>>> As I already mentioned, Neo4j offers a dedicated module >>>>>> (neo4j-harness) >>>>>> for unit testing server extensions / REST applications. The problem >>>>>> here >>>>>> is that the dependencies of Flink conflict with the dependencies of >>>>>> neo4j-harness (e.g. jetty, scala-library). I tried to figure out what >>>>>> combination could run using the maven exclude mechanism, but no >>>>>> success. >>>>>> >>>>>> So I thought about workarounds: >>>>>> >>>>>> (1) Michael Hunger (neo4j) started a project and invited me to >>>>>> contribute [1]. What it does during tests is: >>>>>> - download a neo4j-<version>.tar.gz into a temp folder >>>>>> - extract and start a neo4j instance >>>>>> - run tests >>>>>> - stop and discard neo4j >>>>>> >>>>>> I like the concept, but I guess the problem is that it runs outside of >>>>>> maven and I guess downloading from external resources (especially in >>>>>> travis-ci) could lead to problems. >>>>>> >>>>>> (2) I had a look into the other input formats. flink-hbase uses >>>>>> examples >>>>>> instead of unit tests. This could be an option as long as there is no >>>>>> clean solution for "real" unit testing. >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Cheers, Martin >>>>>> >>>>>> >>>>>> [1] https://github.com/jexp/neo4j-starter >>>>>> >>>>>> >>>>>> On 03.11.2015 01:18, Stephan Ewen wrote: >>>>>> >>>>>> Wow, very nice results :-) >>>>>>> >>>>>>> This input format alone is probably a very useful contribution, so I >>>>>>> >>>>>>> would >>>>>> >>>>>> open a contribution there once you manage to get a few tests running. >>>>>>> >>>>>>> I know little about neo4j, is there a way to read cypher query >>>>>>> results in >>>>>>> parallel? (most systems do not expose such an interface, but maybe >>>>>>> neo4j >>>>>>> >>>>>>> is >>>>>> >>>>>> special there). >>>>>>> >>>>>>> I recall at some point in time Martin Neumann asking about a way to >>>>>>> >>>>>>> create >>>>>> >>>>>> dense contiguous unique IDs for creating graphs that can be >>>>>>> bulk-imported >>>>>>> into neo4j. There is code for that in the data set utils, this may be >>>>>>> valuable for an output format. >>>>>>> >>>>>>> On Sat, Oct 31, 2015 at 9:51 AM, Martin Junghanns < >>>>>>> >>>>>>> m.jungha...@mailbox.org> >>>>>> >>>>>> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>>> >>>>>>>> I wanted to give you a little update. I created a non-parallel >>>>>>>> InputFormat which reads Cypher results from Neo4j into Tuples [1]. >>>>>>>> It can be used like the JDBCInputFormat: >>>>>>>> >>>>>>>> String q = "MATCH (p1:Page)-[:Link]->(p2) RETURN id(p1), id(p2)"; >>>>>>>> >>>>>>>> Neo4jInputFormat<Tuple2<Integer, Integer>> neoInput = >>>>>>>> Neo4jInputFormat.buildNeo4jInputFormat() >>>>>>>> .setRestURI(restURI) >>>>>>>> .setCypherQuery(q) >>>>>>>> .setUsername("neo4j") >>>>>>>> .setPassword("test") >>>>>>>> .setConnectTimeout(1000) >>>>>>>> .setReadTimeout(1000) >>>>>>>> .finish(); >>>>>>>> >>>>>>>> Atm, to run the tests, a Neo4j instance needs to be up and running. >>>>>>>> I tried to get neo4j-harness [2] into the project, but there are >>>>>>>> some >>>>>>>> dependency conflicts which I need to figure out. >>>>>>>> >>>>>>>> I ran a first benchmark on my Laptop (4 cores, 12GB, SSD) with Neo4j >>>>>>>> running on the same machine. My dataset is the polish wiki dump [2] >>>>>>>> which consists of 430,602 pages and 2,727,302 links. The protocol >>>>>>>> is: >>>>>>>> >>>>>>>> 1) Read edge ids from cold Neo4j into Tuple2<Integer, Integer> >>>>>>>> 2) Convert Tuple2 into Tuple3<Integer, Integer, Double> for edge >>>>>>>> value >>>>>>>> 3) Create Gelly graph from Tuple3, init vertex values to 1.0 >>>>>>>> 4) Run PageRank with beta=0.85 and 5 iterations >>>>>>>> >>>>>>>> This takes about 22 seconds on my machine which is very promising. >>>>>>>> >>>>>>>> Next steps are: >>>>>>>> - OutputFormat >>>>>>>> - Better Unit tests (integrate neo4j-harness) >>>>>>>> - Bigger graphs :) >>>>>>>> >>>>>>>> Any ideas and suggestions are of course highly appreciated :) >>>>>>>> >>>>>>>> Best, >>>>>>>> Martin >>>>>>>> >>>>>>>> >>>>>>>> [1] https://github.com/s1ck/flink-neo4j >>>>>>>> [2] >>>>>>>> >>>>>>>> >>>>>>> http://neo4j.com/docs/stable/server-unmanaged-extensions-testing.html >>>>>>> >>>>>> >>>>>> [3] plwiktionary-20151002-pages-articles-multistream.xml.bz2 >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 29.10.2015 14:51, Vasiliki Kalavri wrote: >>>>>>>> >>>>>>>> Hello everyone, >>>>>>>> >>>>>>>>> >>>>>>>>> Martin, Martin, Alex (cc'ed) and myself have started discussing >>>>>>>>> about >>>>>>>>> implementing a neo4j-Flink connector. I've opened a corresponding >>>>>>>>> JIRA >>>>>>>>> (FLINK-2941) containing an initial document [1], but we'd also >>>>>>>>> like to >>>>>>>>> share our ideas here to engage the community and get your feedback. >>>>>>>>> >>>>>>>>> We've had a skype call today and I will try to summarize some of >>>>>>>>> the >>>>>>>>> >>>>>>>>> key >>>>>>>> >>>>>>> >>>>>> points here. The main use-cases we see are the following: >>>>>>> >>>>>>>> >>>>>>>>> - Use Flink for ETL / creating the graph and then insert it to a >>>>>>>>> graph >>>>>>>>> database, like neo4j, for querying and search. >>>>>>>>> - Query neo4j on some topic or search the graph for patterns and >>>>>>>>> >>>>>>>>> extract a >>>>>>>> >>>>>>> >>>>>> subgraph, on which we'd then like to run some iterative graph >>>>>>> >>>>>>>> analysis >>>>>>>>> task. This is where Flink/Gelly can help, by complementing the >>>>>>>>> querying >>>>>>>>> (neo4j) with efficient iterative computation. >>>>>>>>> >>>>>>>>> We all agreed that the main challenge is efficiently getting the >>>>>>>>> data >>>>>>>>> >>>>>>>>> out >>>>>>>> >>>>>>> >>>>>> of neo4j and into Flink. There have been some attempts to do similar >>>>>>> >>>>>>>> things >>>>>>>>> with neo4j and Spark, but the performance results are not very >>>>>>>>> >>>>>>>>> promising: >>>>>>>> >>>>>>> >>>>>> >>>>>>> - Mazerunner [2] is using HDFS for communication. We think that's >>>>>>>>> it's >>>>>>>>> >>>>>>>>> not >>>>>>>> >>>>>>> >>>>>> worth it going towards this direction, as dumping the neo4j >>>>>>> >>>>>>>> database to >>>>>>>>> HDFS and then reading it back to Flink would probably be terribly >>>>>>>>> slow. >>>>>>>>> - In [3], you can see Michael Hunger's findings on using neo's HTTP >>>>>>>>> interface to import data into Spark, run PageRank and then put data >>>>>>>>> >>>>>>>>> back >>>>>>>> >>>>>>> >>>>>> into neo4j. It seems that this took > 2h for a 125m edge graph. The >>>>>>> >>>>>>>> >>>>>>>>> main >>>>>>>> >>>>>>> >>>>>> bottlenecks appear to be (1) reading the data as an RDD => this >>>>>>> >>>>>>>> had to >>>>>>>>> >>>>>>>>> be >>>>>>>> >>>>>>> >>>>>> performed into small batches to avoid OOM errors and (2) PageRank >>>>>>> >>>>>>>> computation itself, which seems weird to me. >>>>>>>>> >>>>>>>>> We decided to experiment with neo4j HTTP and Flink and we'll report >>>>>>>>> >>>>>>>>> back >>>>>>>> >>>>>>> >>>>>> when we have some results. >>>>>>> >>>>>>>> >>>>>>>>> In the meantime, if you have any ideas on how we could speed up >>>>>>>>> reading >>>>>>>>> from neo4j or any suggestion on approaches that I haven't >>>>>>>>> mentioned, >>>>>>>>> please >>>>>>>>> feel free to reply to this e-mail or add your comment in the shared >>>>>>>>> document. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> -Vasia. >>>>>>>>> >>>>>>>>> [1]: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>> https://docs.google.com/document/d/13qT_e-y8aTNWQnD43jRBq1074Y1LggPNDsic_Obwc28/edit?usp=sharing >>>>>> >>>>>> [2]: https://github.com/kbastani/neo4j-mazerunner >>>>>> >>>>>>> [3]: https://gist.github.com/jexp/0dfad34d49a16000e804 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>