Hi Kien Truong, I found a solution to your problem. It's actually a bug in Flink's optimizer. Thanks for spotting it :-)
I've opened a pull request to fix it ( https://github.com/apache/flink/pull/1388). The fix will also be included in the upcoming `0.10.1` release. After the pull request has been merged you can try it out by either checking the current master out and building Flink yourself or wait until the SNAPSHOT binaries have been updated (usually over night). Cheers, Till On Thu, Nov 19, 2015 at 2:05 PM, Truong Duc Kien <duckientru...@gmail.com> wrote: > Hi Till, > I have narrowed down a minimal test case, you will need flink-gelly-scala > package to run this. > > import org.apache.flink.api.common.functions.MapFunctionimport > org.apache.flink.api.scala._import org.apache.flink.graph._import > org.apache.flink.graph.scala.Graphimport > org.apache.flink.types.NullValueimport org.apache.flink.util.Collectorobject > BulkIterationBug { > def main(args: Array[String]): Unit = { > val environment = ExecutionEnvironment.getExecutionEnvironment val g = > Graph.fromCsvReader[Long, Long, NullValue]( > pathEdges = "edge.in", > vertexValueInitializer = new MapFunction[Long, Long] { > override def map(t: Long): Long = t }, > fieldDelimiterEdges = " ", > lineDelimiterEdges = "\n", > ignoreCommentsEdges = "%", > env = environment > ) > val vertices = g.getVertices val edges = g.getEdges val data = > vertices.iterate(1) { > (it) => { > it.coGroup(edges).where(0).equalTo(0) { > (first: Iterator[Vertex[Long, Long]], > second: Iterator[Edge[Long, NullValue]], > collector: Collector[Vertex[Long, Long]]) => { > if (first.hasNext) { > collector.collect(first.next) > } > } > } > } > } > println(data.collect()) > } > } > > The input file "edge.in" contains only 1 line > > 1 2 > > > Thanks, > Kien Truong > > > > On 11/19/2015 09:36 AM, Till Rohrmann wrote: > > Hi Kien Truong, > > could you share the problematic code with us? > > Cheers, > Till > On Nov 18, 2015 9:54 PM, "Truong Duc Kien" <duckientru...@gmail.com> > wrote: > >> Hi, >> >> I'm hitting Compiler Exception with some of my data set, but not all of >> them. >> >> Exception in thread "main" org.apache.flink.optimizer.CompilerException: >> No plan meeting the requirements could be created @ Bulk Iteration (Bulk >> Iteration) (1:null). Most likely reason: Too restrictive plan hints. >> >> Can I have some hints on how to troubleshoot this ? >> >> Thanks, >> Kien Truong >> >> >